import time import uuid from functools import wraps from django.db import connection, transaction from django.utils import timezone from huey.exceptions import TaskLockedException class SqliteSemaphore: def __init__(self, db_path, table_name, name, value=1, timeout=None): self.db_path = db_path self.table_name = table_name self.name = name self.value = value self.timeout = timeout or 86400 # Set a max age for lock holders # Ensure the table exists with connection.cursor() as cursor: cursor.execute(f""" CREATE TABLE IF NOT EXISTS {self.table_name} ( id TEXT PRIMARY KEY, name TEXT, timestamp DATETIME ) """) def acquire(self, name=None): name = name or str(uuid.uuid4()) now = timezone.now() expiration = now - timezone.timedelta(seconds=self.timeout) with transaction.atomic(): # Remove expired locks with connection.cursor() as cursor: cursor.execute(f""" DELETE FROM {self.table_name} WHERE name = %s AND timestamp < %s """, [self.name, expiration]) # Try to acquire the lock with connection.cursor() as cursor: cursor.execute(f""" INSERT INTO {self.table_name} (id, name, timestamp) SELECT %s, %s, %s WHERE ( SELECT COUNT(*) FROM {self.table_name} WHERE name = %s ) < %s """, [name, self.name, now, self.name, self.value]) if cursor.rowcount > 0: return name # If we couldn't acquire the lock, remove our attempted entry with connection.cursor() as cursor: cursor.execute(f""" DELETE FROM {self.table_name} WHERE id = %s AND name = %s """, [name, self.name]) return None def release(self, name): with connection.cursor() as cursor: cursor.execute(f""" DELETE FROM {self.table_name} WHERE id = %s AND name = %s """, [name, self.name]) return cursor.rowcount > 0 LOCKS_DB_PATH = settings.CONFIG.OUTPUT_DIR / 'locks.sqlite3' def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None): """ Lock which can be acquired multiple times (default = 1). NOTE: no provisions are made for blocking, waiting, or notifying. This is just a lock which can be acquired a configurable number of times. Example: # Allow up to 3 workers to run this task concurrently. If the task is # locked, retry up to 2 times with a delay of 60s. @huey.task(retries=2, retry_delay=60) @lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3) def my_task(): ... """ sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout) def decorator(fn): @wraps(fn) def inner(*args, **kwargs): tid = sem.acquire() if tid is None: raise TaskLockedException(f'unable to acquire lock {lock_name}') try: return fn(*args, **kwargs) finally: sem.release(tid) return inner return decorator