add new Snapshot.archive method powered by huey task

This commit is contained in:
Nick Sweeting 2024-09-24 21:17:51 -07:00
parent e99260feb2
commit b117484de7
No known key found for this signature in database
3 changed files with 172 additions and 2 deletions

View file

@ -598,6 +598,9 @@ def add(urls: Union[str, List[str]],
"""Add a new URL or list of URLs to your archive"""
from core.models import Snapshot, Tag
from queues.supervisor_util import start_cli_workers, tail_worker_logs
from queues.tasks import bg_archive_link
assert depth in (0, 1), 'Depth must be 0 or 1 (depth >1 is not supported yet)'
@ -609,6 +612,8 @@ def add(urls: Union[str, List[str]],
# Load list of links from the existing index
check_data_folder(CONFIG)
check_dependencies(CONFIG)
worker = start_cli_workers()
new_links: List[Link] = []
all_links = load_main_index(out_dir=out_dir)
@ -686,6 +691,8 @@ def add(urls: Union[str, List[str]],
stderr(f'[*] [{ts}] Archiving {len(new_links)}/{len(all_links)} URLs from added set...', color='green')
archive_links(new_links, overwrite=False, **archive_kwargs)
tail_worker_logs(worker['stdout_logfile'])
if CAN_UPGRADE:
hint(f"There's a new version of ArchiveBox available! Your current version is {VERSION}. You can upgrade to {VERSIONS_AVAILABLE['recommended_version']['tag_name']} ({VERSIONS_AVAILABLE['recommended_version']['html_url']}). For more on how to upgrade: https://github.com/ArchiveBox/ArchiveBox/wiki/Upgrading-or-Merging-Archives\n")
@ -789,9 +796,12 @@ def update(resume: Optional[float]=None,
from core.models import ArchiveResult
from .search import index_links
from .queues.supervisor_util import start_cli_workers
check_data_folder(CONFIG)
check_dependencies(CONFIG)
start_cli_workers()
new_links: List[Link] = [] # TODO: Remove input argument: only_new
extractors = extractors.split(",") if extractors else []

View file

@ -14,6 +14,22 @@ from xmlrpc.client import ServerProxy
from .settings import CONFIG_FILE, PID_FILE, SOCK_FILE, LOG_FILE, WORKER_DIR, TMP_DIR, LOGS_DIR
from typing import Iterator
def follow(file, sleep_sec=0.1) -> Iterator[str]:
""" Yield each line from a file as they are written.
`sleep_sec` is the time to sleep after empty reads. """
line = ''
while True:
tmp = file.readline()
if tmp is not None and tmp != "":
line += tmp
if line.endswith("\n"):
yield line
line = ''
elif sleep_sec:
time.sleep(sleep_sec)
def create_supervisord_config():
config_content = f"""
@ -56,8 +72,7 @@ def create_worker_config(daemon):
config_content += f"{key}={value}\n"
config_content += "\n"
with open(configfile, "w") as f:
f.write(config_content)
configfile.write_text(config_content)
def get_existing_supervisord_process():
@ -197,6 +212,27 @@ def watch_worker(supervisor, daemon_name, interval=5):
time.sleep(interval)
continue
def tail_worker_logs(log_path: str):
get_or_create_supervisord_process(daemonize=True)
from rich.live import Live
from rich.table import Table
table = Table()
table.add_column("TS")
table.add_column("URL")
try:
with Live(table, refresh_per_second=1) as live: # update 4 times a second to feel fluid
with open(log_path, 'r') as f:
for line in follow(f):
if '://' in line:
live.console.print(f"Working on: {line.strip()}")
table.add_row("123124234", line.strip())
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
def get_worker(supervisor, daemon_name):
try:
@ -228,6 +264,83 @@ def stop_worker(supervisor, daemon_name):
raise Exception(f"Failed to stop worker {daemon_name}!")
def start_server_workers(host='0.0.0.0', port='8000'):
supervisor = get_or_create_supervisord_process(daemonize=False)
bg_workers = [
{
"name": "worker_system_tasks",
"command": "archivebox manage djangohuey --queue system_tasks",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_system_tasks.log",
"redirect_stderr": "true",
},
]
fg_worker = {
"name": "worker_daphne",
"command": f"daphne --bind={host} --port={port} --application-close-timeout=600 archivebox.core.asgi:application",
"autostart": "false",
"autorestart": "true",
"stdout_logfile": "logs/worker_daphne.log",
"redirect_stderr": "true",
}
print()
for worker in bg_workers:
start_worker(supervisor, worker)
print()
start_worker(supervisor, fg_worker)
print()
try:
watch_worker(supervisor, "worker_daphne")
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, "worker_daphne")
time.sleep(0.5)
def start_cli_workers(watch=False):
supervisor = get_or_create_supervisord_process(daemonize=False)
fg_worker = {
"name": "worker_system_tasks",
"command": "archivebox manage djangohuey --queue system_tasks",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_system_tasks.log",
"redirect_stderr": "true",
}
start_worker(supervisor, fg_worker)
if watch:
try:
watch_worker(supervisor, "worker_system_tasks")
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except BaseException as e:
print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, "worker_system_tasks")
time.sleep(0.5)
return fg_worker
def main(daemons):
supervisor = get_or_create_supervisord_process(daemonize=True)

View file

@ -5,8 +5,13 @@ from django_huey import db_task, task
from huey_monitor.models import TaskModel
from huey_monitor.tqdm import ProcessInfo
from .supervisor_util import get_or_create_supervisord_process
@db_task(queue="system_tasks", context=True)
def bg_add(add_kwargs, task=None, parent_task_id=None):
get_or_create_supervisord_process(daemonize=True)
from ..main import add
if task and parent_task_id:
@ -24,6 +29,8 @@ def bg_add(add_kwargs, task=None, parent_task_id=None):
@task(queue="system_tasks", context=True)
def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
get_or_create_supervisord_process(daemonize=True)
from ..extractors import archive_links
if task and parent_task_id:
@ -39,3 +46,43 @@ def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
result = archive_links(*args, **kwargs)
process_info.update(n=rough_count)
return result
@task(queue="system_tasks", context=True)
def bg_archive_link(args, kwargs=None,task=None, parent_task_id=None):
get_or_create_supervisord_process(daemonize=True)
from ..extractors import archive_link
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
assert args and args[0]
kwargs = kwargs or {}
rough_count = len(args[0])
process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=rough_count)
result = archive_link(*args, **kwargs)
process_info.update(n=rough_count)
return result
@task(queue="system_tasks", context=True)
def bg_archive_snapshot(snapshot, overwrite=False, methods=None, task=None, parent_task_id=None):
# get_or_create_supervisord_process(daemonize=True)
from ..extractors import archive_link
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
process_info = ProcessInfo(task, desc="archive_link", parent_task_id=parent_task_id, total=1)
link = snapshot.as_link_with_details()
result = archive_link(link, overwrite=overwrite, methods=methods)
process_info.update(n=1)
return result