add SQLite semaphore mockup

This commit is contained in:
Nick Sweeting 2024-09-22 16:28:30 -07:00
parent 28aea6465e
commit d89b6ce419
No known key found for this signature in database
4 changed files with 120 additions and 17 deletions

View file

@ -86,23 +86,23 @@ class ChromeDependencyConfigs(BaseConfigSet):
CHROME_EXTRA_ARGS: List[str] = [] CHROME_EXTRA_ARGS: List[str] = []
CHROME_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}'] CHROME_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}']
def load(self) -> Self: # def load(self) -> Self:
# for each field in the model, load its value # # for each field in the model, load its value
# load from each source in order of precedence (lowest to highest): # # load from each source in order of precedence (lowest to highest):
# - schema default # # - schema default
# - ArchiveBox.conf INI file # # - ArchiveBox.conf INI file
# - environment variables # # - environment variables
# - command-line arguments # # - command-line arguments
LOADED_VALUES: Dict[str, Any] = {} # LOADED_VALUES: Dict[str, Any] = {}
for field_name, field in self.__fields__.items(): # for field_name, field in self.__fields__.items():
def_value = field.default_factory() if field.default_factory else field.default # def_value = field.default_factory() if field.default_factory else field.default
ini_value = settings.INI_CONFIG.get_value(field_name) # ini_value = settings.INI_CONFIG.get_value(field_name)
env_value = settings.ENV_CONFIG.get_value(field_name) # env_value = settings.ENV_CONFIG.get_value(field_name)
cli_value = settings.CLI_CONFIG.get_value(field_name) # cli_value = settings.CLI_CONFIG.get_value(field_name)
run_value = settings.RUN_CONFIG.get_value(field_name) # run_value = settings.RUN_CONFIG.get_value(field_name)
value = run_value or cli_value or env_value or ini_value or def_value # value = run_value or cli_value or env_value or ini_value or def_value
class ChromeConfigs(ChromeDependencyConfigs): class ChromeConfigs(ChromeDependencyConfigs):
# section: ConfigSectionName = 'ALL_CONFIGS' # section: ConfigSectionName = 'ALL_CONFIGS'

View file

@ -36,6 +36,7 @@ class BaseQueue(BaseHook):
return AttrDict(all_tasks) return AttrDict(all_tasks)
def get_huey_config(self, settings) -> dict: def get_huey_config(self, settings) -> dict:
"""Get the config dict to insert into django.conf.settings.DJANGO_HUEY['queues']."""
return { return {
"huey_class": "huey.SqliteHuey", "huey_class": "huey.SqliteHuey",
"filename": settings.QUEUE_DATABASE_NAME, "filename": settings.QUEUE_DATABASE_NAME,
@ -58,6 +59,7 @@ class BaseQueue(BaseHook):
} }
def get_supervisor_config(self, settings) -> dict: def get_supervisor_config(self, settings) -> dict:
"""Ge the config dict used to tell sueprvisord to start a huey consumer for this queue."""
return { return {
"name": f"worker_{self.name}", "name": f"worker_{self.name}",
"command": f"archivebox manage djangohuey --queue {self.name}", "command": f"archivebox manage djangohuey --queue {self.name}",

View file

@ -63,7 +63,7 @@ def convert(ini_str: str) -> str:
### Basic Assertions ### Basic Assertions
test_input = """ test_input = r"""
[SERVER_CONFIG] [SERVER_CONFIG]
IS_TTY=False IS_TTY=False
USE_COLOR=False USE_COLOR=False
@ -225,7 +225,7 @@ NODE_VERSION=v21.7.3
""" """
expected_output = '''[SERVER_CONFIG] expected_output = r'''[SERVER_CONFIG]
IS_TTY = false IS_TTY = false
USE_COLOR = false USE_COLOR = false
SHOW_PROGRESS = false SHOW_PROGRESS = false

View file

@ -0,0 +1,101 @@
import time
import uuid
from functools import wraps
from django.db import connection, transaction
from django.utils import timezone
from huey.exceptions import TaskLockedException
class SqliteSemaphore:
def __init__(self, db_path, table_name, name, value=1, timeout=None):
self.db_path = db_path
self.table_name = table_name
self.name = name
self.value = value
self.timeout = timeout or 86400 # Set a max age for lock holders
# Ensure the table exists
with connection.cursor() as cursor:
cursor.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
id TEXT PRIMARY KEY,
name TEXT,
timestamp DATETIME
)
""")
def acquire(self, name=None):
name = name or str(uuid.uuid4())
now = timezone.now()
expiration = now - timezone.timedelta(seconds=self.timeout)
with transaction.atomic():
# Remove expired locks
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE FROM {self.table_name}
WHERE name = %s AND timestamp < %s
""", [self.name, expiration])
# Try to acquire the lock
with connection.cursor() as cursor:
cursor.execute(f"""
INSERT INTO {self.table_name} (id, name, timestamp)
SELECT %s, %s, %s
WHERE (
SELECT COUNT(*) FROM {self.table_name}
WHERE name = %s
) < %s
""", [name, self.name, now, self.name, self.value])
if cursor.rowcount > 0:
return name
# If we couldn't acquire the lock, remove our attempted entry
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE FROM {self.table_name}
WHERE id = %s AND name = %s
""", [name, self.name])
return None
def release(self, name):
with connection.cursor() as cursor:
cursor.execute(f"""
DELETE FROM {self.table_name}
WHERE id = %s AND name = %s
""", [name, self.name])
return cursor.rowcount > 0
LOCKS_DB_PATH = settings.CONFIG.OUTPUT_DIR / 'locks.sqlite3'
def lock_task_semaphore(db_path, table_name, lock_name, value=1, timeout=None):
"""
Lock which can be acquired multiple times (default = 1).
NOTE: no provisions are made for blocking, waiting, or notifying. This is
just a lock which can be acquired a configurable number of times.
Example:
# Allow up to 3 workers to run this task concurrently. If the task is
# locked, retry up to 2 times with a delay of 60s.
@huey.task(retries=2, retry_delay=60)
@lock_task_semaphore('path/to/db.sqlite3', 'semaphore_locks', 'my-lock', 3)
def my_task():
...
"""
sem = SqliteSemaphore(db_path, table_name, lock_name, value, timeout)
def decorator(fn):
@wraps(fn)
def inner(*args, **kwargs):
tid = sem.acquire()
if tid is None:
raise TaskLockedException(f'unable to acquire lock {lock_name}')
try:
return fn(*args, **kwargs)
finally:
sem.release(tid)
return inner
return decorator