mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-22 12:13:05 +00:00
hide progress bar on startup
This commit is contained in:
parent
17faa5a507
commit
721427a484
4 changed files with 117 additions and 7 deletions
|
@ -60,7 +60,7 @@ def setup_django(check_db=False, in_memory_db=False) -> None:
|
||||||
return
|
return
|
||||||
|
|
||||||
with Progress(transient=True, expand=True, console=STDERR) as INITIAL_STARTUP_PROGRESS:
|
with Progress(transient=True, expand=True, console=STDERR) as INITIAL_STARTUP_PROGRESS:
|
||||||
INITIAL_STARTUP_PROGRESS_TASK = INITIAL_STARTUP_PROGRESS.add_task("[green]Loading modules...", total=25)
|
INITIAL_STARTUP_PROGRESS_TASK = INITIAL_STARTUP_PROGRESS.add_task("[green]Loading modules...", total=25, visible=False)
|
||||||
|
|
||||||
from archivebox.config.permissions import IS_ROOT, ARCHIVEBOX_USER, ARCHIVEBOX_GROUP, SudoPermission
|
from archivebox.config.permissions import IS_ROOT, ARCHIVEBOX_USER, ARCHIVEBOX_GROUP, SudoPermission
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,7 @@ SQLITE_CONNECTION_OPTIONS = {
|
||||||
# https://gcollazo.com/optimal-sqlite-settings-for-django/
|
# https://gcollazo.com/optimal-sqlite-settings-for-django/
|
||||||
# https://litestream.io/tips/#busy-timeout
|
# https://litestream.io/tips/#busy-timeout
|
||||||
# https://docs.djangoproject.com/en/5.1/ref/databases/#setting-pragma-options
|
# https://docs.djangoproject.com/en/5.1/ref/databases/#setting-pragma-options
|
||||||
"timeout": 5,
|
"timeout": 10,
|
||||||
"check_same_thread": False,
|
"check_same_thread": False,
|
||||||
"transaction_mode": "IMMEDIATE",
|
"transaction_mode": "IMMEDIATE",
|
||||||
"init_command": (
|
"init_command": (
|
||||||
|
|
|
@ -20,11 +20,119 @@ from django.urls import reverse_lazy
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
# ORCHESTRATOR:
|
||||||
|
# An orchestrator is a single long-running daemon process that manages spawning and killing actors for different queues of objects.
|
||||||
|
# The orchestrator first starts when the archivebox starts, and it stops when archivebox is killed.
|
||||||
|
# Only one orchestrator process can be running per collection per machine.
|
||||||
|
# An orchestrator is aware of all of the ActorTypes that are defined in the system, and their associated queues.
|
||||||
|
# When started, the orchestrator runs a single runloop that continues until the archivebox process is killed.
|
||||||
|
# On each loop, the orchestrator:
|
||||||
|
# - loops through each ActorType defined in the system:
|
||||||
|
# - fetches the queue of objects pending for that ActorType by calling ActorType.get_queue()
|
||||||
|
# - check how many actors are currently running for that ActorType by calling current_actors = ActorType.get_running_actors()
|
||||||
|
# - determine how many new actors are needed and what their launch kwargs should be to process the objects in each queue
|
||||||
|
# actors_to_spawn = ActorType.get_actors_to_spawn(queue, current_actors)
|
||||||
|
# - e.g. if there is are 4 ArchiveResult objects queued all with the same persona + extractor, it should spawn a single actor to process all of them, if there are 4000 it should spawn ~5 actors
|
||||||
|
# - if there are 4 ArchiveResult objects queued with different personas + extractors, it should spawn a single actor for each persona + extractor
|
||||||
|
# - if there are *many* objects to process, it can spawn more actors of the same type up to ActorType.MAX_ACTORS to speed things up
|
||||||
|
# - spawns the new of actors needed as subprocesses ActorType.spawn_actors(actors_to_spawn, block=False, double_fork=False)
|
||||||
|
# - checks for ANY objects in the DB that have a retry_at time set but where no ActorType has them in their queue, and raises a warning that they are orphaned and will never be processed
|
||||||
|
# - sleeps for 0.1s before repeating the loop, to reduce the CPU load
|
||||||
|
# The orchestrator does not manage killing actors, actors are expected to exit on their own when idle.
|
||||||
|
# ABX defines the following hookspecs for plugins to hook into the orchestrator lifecycle:
|
||||||
|
# - abx.pm.hook.on_orchestrator_startup(all_actor_types)
|
||||||
|
# - abx.pm.hook.on_orchestrator_tick_started(all_actor_types, all_queues, all_running_actors)
|
||||||
|
# - abx.pm.hook.on_orchestrator_idle(all_actor_types) # only run when there are no queues with pending objects to process
|
||||||
|
# - abx.pm.hook.on_orchestrator_shutdown(all_actor_types)
|
||||||
|
|
||||||
|
# OBJECT:
|
||||||
|
# e.g. Snapshot, Crawl, ArchiveResult
|
||||||
|
# An object is a single row in a database table, defined by a django model.
|
||||||
|
# An object has a finite set of states that it can be in.
|
||||||
|
# An object has a status field that holds the object's current state e.g status="queued".
|
||||||
|
# An object has a retry_at field that holds a timestamp for when it should next be checked by a actor eventloop.
|
||||||
|
# Each type of object has a single tick() method defined that handles all of its state transitions.
|
||||||
|
# When an object's retry_at time has passed, the actor managing that type of object will spwan an actor an call tick(object) to move it to its next state.
|
||||||
|
# ABX defines the following hookspecs for plugins to hook into object lifecycle: # use these for in-memory operations, dont use these for db on_create/on_update/on_delete logic, separate hooks are available on write operations below
|
||||||
|
# - abx.pm.hook.on_<objecttype>_init(object) # when object is initialized in-memory, don't put any slow code here as it runs on every object returned from DB queries! only for setting default values, ._cache_attrs, etc.
|
||||||
|
# - abx.pm.hook.on_<objecttype>_clean(object) # when object's form fields are validated but before it is to be saved to the DB, put any checks/validations on field values here
|
||||||
|
# - abx.pm.hook.on_<objecttype>_save(object) # when object is being saved to the DB, put any code here that should run right before super().save()
|
||||||
|
|
||||||
|
# ACTORS:
|
||||||
|
# A actor is a long-running daemon process that runs a loop to process a single object at a time from a queue it defines (e.g. ActorType.queue=Snapshot.objects.filter(status='queued', retry_at__lte=time.now())).
|
||||||
|
# An actor at runtime is an instance of an ActorType class + some launch kwargs that it's passed at startup (e.g. persona, extractor, etc.).
|
||||||
|
# Actors are started lazily by the orchestrator only when their ActorType.queue indicates there are pending objects to process.
|
||||||
|
# ActorTypes should define ActorType.get_queue(), ActorType.get_actors_to_spawn(), ActorType.get_running_actors(), and ActorType.spawn_actors() methods exposed to the orchestrator.
|
||||||
|
# On startup, a actor can initialize shared resources it needs to perform its work, and keep a reference in memory to them. (e.g. launch chrome in the background, setup an API client, etc.)
|
||||||
|
# On each loop, the actor gets a single object to process from the top of the queue, and runs ActorType.tick(object).
|
||||||
|
# The actor should have a hardcoded ActorType.MAX_TICK_TIME, and should enforce it by killing the tick() method if it runs too long.
|
||||||
|
# Before calling tick(), a actor should bump the object.retry_at time by MAX_TICK_TIME to prevent other actors from picking it up while the current actor is still processing it.
|
||||||
|
# The actor blocks waiting for tick(obj) to finish executing, then the loop repeats and it gets the next object to call tick(object) on.
|
||||||
|
# If a tick(obj) method raises an exception, the actor should catch it and log it, then move on to the next object in the queue.
|
||||||
|
# If there are no objects left in the queue, the actor should exit.
|
||||||
|
# On exit, a actor should release any shared resources it initialized on startup and clean up after itself.
|
||||||
|
# On startup an actor should fire abx.pm.hook.on_actor_startup(object) and on exit it should fire abx.pm.hook.on_actor_exit(object) (both syncronous hooks that can be used by plugins to register any startup/cleanup code).
|
||||||
|
# An ActorType defines the following hookspecs for plugins to hook into its behavior:
|
||||||
|
# - abx.pm.hook.on_actor_startup(actor, queue)
|
||||||
|
# - abx.pm.hook.on_actor_tick_started(actor, object)
|
||||||
|
# - abx.pm.hook.on_actor_tick_finished(actor, object)
|
||||||
|
# - abx.pm.hook.on_actor_tick_exception(actor, object, exception)
|
||||||
|
# - abx.pm.hook.on_actor_shutdown(actor)
|
||||||
|
|
||||||
|
# TICK:
|
||||||
|
# A tick() method is a method defined on an ActorType, passed a single object to process and perform a single state transition on.
|
||||||
|
# A tick() method does NOT need to lock the object its operating on, the actor will bump the object's retry_at += MAX_TICK_TIME before handing it off to tick().
|
||||||
|
# A tick() method does NOT open a DB transaction for its entire duration of execution, instead it should do all its writes in one atomic operation using a compare-and-swap .select(status=previous_state).update(status=next_state) (optimistic concurrency control).
|
||||||
|
# A tick() method does NOT return any values, it either succeeds and returns None, or fails and raises an exception to be handled by the actor runloop.
|
||||||
|
# A tick() method does NOT need to enforce its own MAX_TICK_TIME / any timeouts, the actor runloop code should enforce that.
|
||||||
|
# A tick() should NOT call other tick() methods directly, and it should not spawn orchestrator or actor processes.
|
||||||
|
# A tick() should set its object.retry_at time to a value farther in the future and return early if it wants to skip execution due to hitting a ratelimit or transient error.
|
||||||
|
# A tick() can:
|
||||||
|
# - read from any other objects, filesystem, or external APIs (e.g. check if snapshot_dir/screenshot.png exists)
|
||||||
|
# - perform any checks necessary and branch and determine what the transition it should perform to which next state
|
||||||
|
# - execute a single transition_from_abx_to_xyz(object) method to perform the transition to the next state it decided on
|
||||||
|
|
||||||
|
# TRANSITION:
|
||||||
|
# A transition_from_abx_to_xyz(object) method is a function defined on an ActorType, passed a single object by a tick() method to perform a defined transition on.
|
||||||
|
# A transition_from_abx_to_xyz() method does NOT need to lock the object its operating on or open any db transactions.
|
||||||
|
# A transiton should not have any branching logic, it should only execute the given transition that it defines + any side effects.
|
||||||
|
# A transition should be indempotent, if two transitions run at once on the same object it should only perform one transition and the other should fail
|
||||||
|
# A transition should be atomic, if it is interrupted it should leave the object in a consistent state
|
||||||
|
# A transition's main body should:
|
||||||
|
# - perform a SINGLE write() to the underlying object using a compare_and_swap .filter(status=last_state).update(status=next_state) to move it to its next state
|
||||||
|
# - update the object's retry_at time to a new value, or set it to None if it's in a final state & should not be checked again
|
||||||
|
# A transition can also trigger side effects at the end of its execution:
|
||||||
|
# - update the retry_at time on *other* objects (so that they are rechecked by their own actor on the next tick) (ONLY retry_at, do not update any other fields)
|
||||||
|
# - filesystem operations (e.g. moving a directory to a new location)
|
||||||
|
# - external API calls (e.g. uploading to s3, firing a webhook, writing to a logfile, etc.)
|
||||||
|
# - DO NOT use side effects to directly mutate other objects state or trigger other state transitions
|
||||||
|
# ABX defines the following hookspecs for plugins to hook into transition behavior:
|
||||||
|
# - abx.pm.hook.on_transition_<objecttype>_from_abx_to_xyz_started(object)
|
||||||
|
# - abx.pm.hook.on_transition_<objecttype>_from_abx_to_xyz_succeeded(object)
|
||||||
|
|
||||||
|
# READ:
|
||||||
|
# A read() method is a function defined for a given ActorType that performs a single read from the DB and/or other read models like django cache, filesystem, in-memory caches, etc.
|
||||||
|
# A read() method should accept either an instance/pk/uuid/abid or some filter_kwargs, and return a benedict/TypedDict or pydantic model containing bare values as the result.
|
||||||
|
|
||||||
|
# WRITE:
|
||||||
|
# A write() method is a function defined for a given ActorType that performs a single atomic db write to update the DB, django cache, filesystem, in-memory caches, etc. for that object.
|
||||||
|
# A write() method does NOT need to lock the object its operating on or open any db transactions, it should just perform a single compare-and-swap .select(status=last_state).update(status=next_state) operation.
|
||||||
|
# A write() method does NOT need to enforce any timeouts or ratelimits, the tick() method should do that.
|
||||||
|
# A write() method should NOT have any branching logic or side effects like spawning other processes.
|
||||||
|
# ABX defines the following hookspecs for plugins to hook into write behavior:
|
||||||
|
# - abx.pm.hook.on_<objecttype>_created(object)
|
||||||
|
# - abx.pm.hook.on_<objecttype>_updated(object)
|
||||||
|
# - abx.pm.hook.on_<objecttype>_deleted(object)
|
||||||
|
|
||||||
|
# SIDEEFFECT:
|
||||||
|
# A sideeffect is a helper function defined in an app to be used by one or more tick() methods to perform a side effect that isn't a simple DB write or read.
|
||||||
|
# A sideeffect can spawn other processes, make 3rd-party API calls, write to the filesystem, etc. e.g. subprocess.Popen('wget https://example.com')
|
||||||
|
# A sideeffect should execute quickly and return early, it should try not to block for slow RPCs, subprocess jobs, or network operations.
|
||||||
|
# For slow or long-running sideeffects, spawn a separate background process and return immediately. Update the object's retry_at time and state as-needed so that a future tick() will check for any expected output from the background job.
|
||||||
|
# ABX defines the following hookspecs for plugins to hook into sideeffect behavior:
|
||||||
|
# - abx.pm.hook.on_sideeffect_xyz_started(object)
|
||||||
|
# - abx.pm.hook.on_sideeffect_xyz_succeeded(object)
|
||||||
|
# - abx.pm.hook.on_sideeffect_xyz_failed(object)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -99,6 +207,7 @@ def transition_snapshot_to_started(snapshot, config, cwd):
|
||||||
fields_to_update = {'status': 'started', 'retry_at': retry_at, 'retries': retries, 'start_ts': time.now(), 'end_ts': None}
|
fields_to_update = {'status': 'started', 'retry_at': retry_at, 'retries': retries, 'start_ts': time.now(), 'end_ts': None}
|
||||||
snapshot = abx.archivebox.writes.update_snapshot(filter_kwargs=snapshot_to_update, update_kwargs=fields_to_update)
|
snapshot = abx.archivebox.writes.update_snapshot(filter_kwargs=snapshot_to_update, update_kwargs=fields_to_update)
|
||||||
|
|
||||||
|
# trigger side effects on state transition (these just emit an event to a separate queue thats then processed by a huey worker)
|
||||||
cleanup_snapshot_dir(snapshot, config, cwd)
|
cleanup_snapshot_dir(snapshot, config, cwd)
|
||||||
create_snapshot_pending_archiveresults(snapshot, config, cwd)
|
create_snapshot_pending_archiveresults(snapshot, config, cwd)
|
||||||
update_snapshot_index_json(archiveresult, config, cwd)
|
update_snapshot_index_json(archiveresult, config, cwd)
|
||||||
|
@ -114,6 +223,7 @@ def transition_snapshot_to_sealed(snapshot, config, cwd):
|
||||||
fields_to_update = {'status': 'sealed', 'retry_at': None, 'end_ts': time.now()}
|
fields_to_update = {'status': 'sealed', 'retry_at': None, 'end_ts': time.now()}
|
||||||
snapshot = abx.archivebox.writes.update_snapshot(filter_kwargs=snapshot_to_update, update_kwargs=fields_to_update)
|
snapshot = abx.archivebox.writes.update_snapshot(filter_kwargs=snapshot_to_update, update_kwargs=fields_to_update)
|
||||||
|
|
||||||
|
# side effects:
|
||||||
cleanup_snapshot_dir(snapshot, config, cwd)
|
cleanup_snapshot_dir(snapshot, config, cwd)
|
||||||
update_snapshot_index_json(snapshot, config, cwd)
|
update_snapshot_index_json(snapshot, config, cwd)
|
||||||
update_snapshot_index_html(snapshot, config, cwd)
|
update_snapshot_index_html(snapshot, config, cwd)
|
||||||
|
@ -225,7 +335,7 @@ def transition_archiveresult_to_started(archiveresult, config, cwd):
|
||||||
fields_to_update = {'status': 'started', 'retry_at': retry_at, 'retries': retries, 'start_ts': time.now(), 'output': None, 'error': None}
|
fields_to_update = {'status': 'started', 'retry_at': retry_at, 'retries': retries, 'start_ts': time.now(), 'output': None, 'error': None}
|
||||||
archiveresult = abx.archivebox.writes.update_archiveresult(filter=archiveresult_to_update, update=fields_to_update)
|
archiveresult = abx.archivebox.writes.update_archiveresult(filter=archiveresult_to_update, update=fields_to_update)
|
||||||
|
|
||||||
|
# side effects:
|
||||||
with TimedProgress():
|
with TimedProgress():
|
||||||
try:
|
try:
|
||||||
from .extractors import WARC_EXTRACTOR
|
from .extractors import WARC_EXTRACTOR
|
||||||
|
@ -334,7 +444,7 @@ def on_crawl_created(crawl):
|
||||||
|
|
||||||
@abx.hookimpl
|
@abx.hookimpl
|
||||||
def on_snapshot_created(snapshot, config):
|
def on_snapshot_created(snapshot, config):
|
||||||
create_archiveresults_pending_from_snapshot(snapshot, config)
|
create_snapshot_pending_archiveresults(snapshot, config)
|
||||||
|
|
||||||
# events
|
# events
|
||||||
@abx.hookimpl
|
@abx.hookimpl
|
||||||
|
@ -361,7 +471,7 @@ def scheduler_runloop():
|
||||||
try:
|
try:
|
||||||
abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl)
|
abx.archivebox.events.on_crawl_schedule_tick(scheduled_crawl)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
abx.archivebox.events.on_crawl_schedule_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl)
|
abx.archivebox.events.on_crawl_schedule_tick_failure(timezone.now(), machine=Machine.objects.get_current_machine(), error=e, schedule=scheduled_crawl)
|
||||||
|
|
||||||
# abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due)
|
# abx.archivebox.events.on_scheduler_tick_end(timezone.now(), machine=Machine.objects.get_current_machine(), tasks=scheduled_tasks_due)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
@ -420,7 +530,7 @@ def create_root_snapshot(crawl):
|
||||||
abx.archivebox.writes.update_crawl_stats(started_at=timezone.now())
|
abx.archivebox.writes.update_crawl_stats(started_at=timezone.now())
|
||||||
|
|
||||||
|
|
||||||
def create_archiveresults_pending_from_snapshot(snapshot, config):
|
def create_snapshot_pending_archiveresults(snapshot, config):
|
||||||
config = get_scope_config(
|
config = get_scope_config(
|
||||||
# defaults=settings.CONFIG_FROM_DEFAULTS,
|
# defaults=settings.CONFIG_FROM_DEFAULTS,
|
||||||
# configfile=settings.CONFIG_FROM_FILE,
|
# configfile=settings.CONFIG_FROM_FILE,
|
||||||
|
|
|
@ -262,7 +262,7 @@ def get_plugin(plugin: PluginId | ModuleType | Type) -> PluginInfo:
|
||||||
# import the plugin module by its name
|
# import the plugin module by its name
|
||||||
if isinstance(plugin, str):
|
if isinstance(plugin, str):
|
||||||
module = importlib.import_module(plugin)
|
module = importlib.import_module(plugin)
|
||||||
print('IMPORTED PLUGIN:', plugin)
|
# print('IMPORTED PLUGIN:', plugin)
|
||||||
plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module))
|
plugin = getattr(module, 'PLUGIN_SPEC', getattr(module, 'PLUGIN', module))
|
||||||
elif inspect.ismodule(plugin):
|
elif inspect.ismodule(plugin):
|
||||||
module = plugin
|
module = plugin
|
||||||
|
|
Loading…
Reference in a new issue