diff --git a/archivebox/core/settings.py b/archivebox/core/settings.py index 3810954e..2b9e7edb 100644 --- a/archivebox/core/settings.py +++ b/archivebox/core/settings.py @@ -267,7 +267,7 @@ if not IS_GETTING_VERSION_OR_HELP: # dont create queue.sqlite3 file HUEY = { "huey_class": "huey.SqliteHuey", "filename": CONSTANTS.QUEUE_DATABASE_FILENAME, - "name": "system_tasks", + "name": "commands", "results": True, "store_none": True, "immediate": False, @@ -288,7 +288,7 @@ if not IS_GETTING_VERSION_OR_HELP: # dont create queue.sqlite3 file # https://huey.readthedocs.io/en/latest/contrib.html#setting-things-up # https://github.com/gaiacoop/django-huey DJANGO_HUEY = { - "default": "system_tasks", + "default": "commands", "queues": { HUEY["name"]: HUEY.copy(), # more registered here at plugin import-time by BaseQueue.register() diff --git a/archivebox/queues/supervisor_util.py b/archivebox/queues/supervisor_util.py index f181da08..0a4285f8 100644 --- a/archivebox/queues/supervisor_util.py +++ b/archivebox/queues/supervisor_util.py @@ -26,6 +26,23 @@ CONFIG_FILE_NAME = "supervisord.conf" PID_FILE_NAME = "supervisord.pid" WORKERS_DIR_NAME = "workers" +SCHEDULER_WORKER = { + "name": "worker_scheduler", + "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --disable-health-check --flush-locks", + "autostart": "true", + "autorestart": "true", + "stdout_logfile": "logs/worker_scheduler.log", + "redirect_stderr": "true", +} +COMMAND_WORKER = { + "name": "worker_commands", + "command": "archivebox manage djangohuey --queue commands -w 4 -k thread --no-periodic --disable-health-check", + "autostart": "true", + "autorestart": "true", + "stdout_logfile": "logs/worker_commands.log", + "redirect_stderr": "true", +} + @cache def get_sock_file(): """Get the path to the supervisord socket file, symlinking to a shorter path if needed due to unix path length limits""" @@ -84,33 +101,35 @@ files = {WORKERS_DIR}/*.conf """ CONFIG_FILE.write_text(config_content) - Path.mkdir(WORKERS_DIR, exist_ok=True) + Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True) + (WORKERS_DIR / 'initial_startup.conf').write_text('') # hides error about "no files found to include" when supervisord starts def create_worker_config(daemon): + """Create a supervisord worker config file for a given daemon""" SOCK_FILE = get_sock_file() WORKERS_DIR = SOCK_FILE.parent / WORKERS_DIR_NAME - Path.mkdir(WORKERS_DIR, exist_ok=True) + Path.mkdir(WORKERS_DIR, exist_ok=True, parents=True) name = daemon['name'] - configfile = WORKERS_DIR / f"{name}.conf" + worker_conf = WORKERS_DIR / f"{name}.conf" - config_content = f"[program:{name}]\n" + worker_str = f"[program:{name}]\n" for key, value in daemon.items(): if key == 'name': continue - config_content += f"{key}={value}\n" - config_content += "\n" + worker_str += f"{key}={value}\n" + worker_str += "\n" - configfile.write_text(config_content) + worker_conf.write_text(worker_str) def get_existing_supervisord_process(): SOCK_FILE = get_sock_file() try: transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}") - server = ServerProxy("http://localhost", transport=transport) + server = ServerProxy("http://localhost", transport=transport) # user:pass@localhost doesn't work for some reason with unix://.sock, cant seem to silence CRIT no-auth warning current_state = cast(Dict[str, int | str], server.supervisor.getState()) if current_state["statename"] == "RUNNING": pid = server.supervisor.getPID() @@ -127,6 +146,7 @@ def stop_existing_supervisord_process(): PID_FILE = SOCK_FILE.parent / PID_FILE_NAME try: + # if pid file exists, load PID int try: pid = int(PID_FILE.read_text()) except (FileNotFoundError, ValueError): @@ -136,15 +156,15 @@ def stop_existing_supervisord_process(): print(f"[πŸ¦Έβ€β™‚οΈ] Stopping supervisord process (pid={pid})...") proc = psutil.Process(pid) proc.terminate() - proc.wait() - except (Exception, BrokenPipeError, IOError): + proc.wait(timeout=5) + except (BaseException, BrokenPipeError, IOError, KeyboardInterrupt): pass finally: try: # clear PID file and socket file PID_FILE.unlink(missing_ok=True) get_sock_file().unlink(missing_ok=True) - except Exception: + except BaseException: pass def start_new_supervisord_process(daemonize=False): @@ -278,47 +298,6 @@ def start_worker(supervisor, daemon, lazy=False): raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}") -def watch_worker(supervisor, daemon_name, interval=5): - """loop continuously and monitor worker's health""" - while True: - proc = get_worker(supervisor, daemon_name) - if not proc: - raise Exception("Worker dissapeared while running! " + daemon_name) - - if proc['statename'] == 'STOPPED': - return proc - - if proc['statename'] == 'RUNNING': - time.sleep(1) - continue - - if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'): - print(f'[πŸ¦Έβ€β™‚οΈ] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}') - time.sleep(interval) - continue - -def tail_worker_logs(log_path: str): - get_or_create_supervisord_process(daemonize=False) - - from rich.live import Live - from rich.table import Table - - table = Table() - table.add_column("TS") - table.add_column("URL") - - try: - with Live(table, refresh_per_second=1) as live: # update 4 times a second to feel fluid - with open(log_path, 'r') as f: - for line in follow(f): - if '://' in line: - live.console.print(f"Working on: {line.strip()}") - # table.add_row("123124234", line.strip()) - except (KeyboardInterrupt, BrokenPipeError, IOError): - STDERR.print("\n[πŸ›‘] Got Ctrl+C, stopping gracefully...") - except SystemExit: - pass - def get_worker(supervisor, daemon_name): try: return supervisor.getProcessInfo(daemon_name) @@ -350,28 +329,55 @@ def stop_worker(supervisor, daemon_name): raise Exception(f"Failed to stop worker {daemon_name}!") +def tail_worker_logs(log_path: str): + get_or_create_supervisord_process(daemonize=False) + + from rich.live import Live + from rich.table import Table + + table = Table() + table.add_column("TS") + table.add_column("URL") + + try: + with Live(table, refresh_per_second=1) as live: # update 4 times a second to feel fluid + with open(log_path, 'r') as f: + for line in follow(f): + if '://' in line: + live.console.print(f"Working on: {line.strip()}") + # table.add_row("123124234", line.strip()) + except (KeyboardInterrupt, BrokenPipeError, IOError): + STDERR.print("\n[πŸ›‘] Got Ctrl+C, stopping gracefully...") + except SystemExit: + pass + +def watch_worker(supervisor, daemon_name, interval=5): + """loop continuously and monitor worker's health""" + while True: + proc = get_worker(supervisor, daemon_name) + if not proc: + raise Exception("Worker dissapeared while running! " + daemon_name) + + if proc['statename'] == 'STOPPED': + return proc + + if proc['statename'] == 'RUNNING': + time.sleep(1) + continue + + if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'): + print(f'[πŸ¦Έβ€β™‚οΈ] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}') + time.sleep(interval) + continue + def start_server_workers(host='0.0.0.0', port='8000', daemonize=False): supervisor = get_or_create_supervisord_process(daemonize=daemonize) bg_workers = [ - { - "name": "worker_scheduler", - "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --disable-health-check --flush-locks", - "autostart": "true", - "autorestart": "true", - "stdout_logfile": "logs/worker_scheduler.log", - "redirect_stderr": "true", - }, - { - "name": "worker_system_tasks", - "command": "archivebox manage djangohuey --queue system_tasks -w 4 -k thread --no-periodic --disable-health-check", - "autostart": "true", - "autorestart": "true", - "stdout_logfile": "logs/worker_system_tasks.log", - "redirect_stderr": "true", - }, + SCHEDULER_WORKER, + COMMAND_WORKER, ] fg_worker = { "name": "worker_daphne", diff --git a/archivebox/queues/tasks.py b/archivebox/queues/tasks.py index acfeab0b..6f62a8c1 100644 --- a/archivebox/queues/tasks.py +++ b/archivebox/queues/tasks.py @@ -1,7 +1,7 @@ __package__ = 'archivebox.queues' from functools import wraps -from django.utils import timezone +# from django.utils import timezone from django_huey import db_task, task @@ -10,7 +10,7 @@ from huey_monitor.tqdm import ProcessInfo from .supervisor_util import get_or_create_supervisord_process -# @db_task(queue="system_tasks", context=True, schedule=1) +# @db_task(queue="commands", context=True, schedule=1) # def scheduler_tick(): # print('SCHEDULER TICK', timezone.now().isoformat()) # # abx.archivebox.events.on_scheduler_runloop_start(timezone.now(), machine=Machine.objects.get_current_machine()) @@ -43,7 +43,7 @@ def db_task_with_parent(func): return wrapper -@db_task(queue="system_tasks", context=True) +@db_task(queue="commands", context=True) def bg_add(add_kwargs, task=None, parent_task_id=None): get_or_create_supervisord_process(daemonize=False) @@ -62,7 +62,7 @@ def bg_add(add_kwargs, task=None, parent_task_id=None): return result -@task(queue="system_tasks", context=True) +@task(queue="commands", context=True) def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None): get_or_create_supervisord_process(daemonize=False) @@ -83,7 +83,7 @@ def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None): return result -@task(queue="system_tasks", context=True) +@task(queue="commands", context=True) def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None): get_or_create_supervisord_process(daemonize=False) @@ -104,7 +104,7 @@ def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None): return result -@task(queue="system_tasks", context=True) +@task(queue="commands", context=True) def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None): # get_or_create_supervisord_process(daemonize=False)