diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py index 1d59bb8f..62369793 100644 --- a/archivebox/actors/actor.py +++ b/archivebox/actors/actor.py @@ -44,16 +44,17 @@ class ActorType(ABC, Generic[ModelType]): launch_kwargs: LaunchKwargs = {} mode: Literal['thread', 'process'] = 'process' + MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.6)), 8) # min 2, max 8, up to 60% of available cpu cores + MAX_TICK_TIME: ClassVar[int] = 60 # maximum duration in seconds to process a single object + QUERYSET: ClassVar[QuerySet] # the QuerySet to claim objects from CLAIM_WHERE: ClassVar[str] = 'status = "queued"' # the WHERE clause to filter the objects when atomically getting the next object from the queue CLAIM_SET: ClassVar[str] = 'status = "started"' # the SET clause to claim the object when atomically getting the next object from the queue CLAIM_ORDER: ClassVar[str] = 'created_at DESC' # the ORDER BY clause to sort the objects with when atomically getting the next object from the queue - CLAIM_FROM_TOP: ClassVar[int] = 50 # the number of objects to consider when atomically getting the next object from the queue + CLAIM_FROM_TOP: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 # the number of objects to consider when atomically getting the next object from the queue ATOMIC: ClassVar[bool] = True # whether to atomically fetch+claim the nextobject in one step, or fetch and lock it in two steps # model_type: Type[ModelType] - MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.6)), 8) # min 2, max 8, up to 60% of available cpu cores - MAX_TICK_TIME: ClassVar[int] = 60 # maximum duration in seconds to process a single object _SPAWNED_ACTOR_PIDS: ClassVar[list[psutil.Process]] = [] # record all the pids of Actors spawned by this class @@ -89,18 +90,19 @@ class ActorType(ABC, Generic[ModelType]): @classmethod def get_actors_to_spawn(cls, queue: QuerySet, running_actors: list[int]) -> list[LaunchKwargs]: """Get a list of launch kwargs for the number of actors to spawn based on the queue and currently running actors""" + queue_length = queue.count() + if not queue_length: # queue is empty, spawn 0 actors + return [] + actors_to_spawn: list[LaunchKwargs] = [] max_spawnable = cls.MAX_CONCURRENT_ACTORS - len(running_actors) - queue_length = queue.count() # spawning new actors is expensive, avoid spawning all the actors at once. To stagger them, # let the next orchestrator tick handle starting another 2 on the next tick() # if queue_length > 10: # queue is long, spawn as many as possible # actors_to_spawn += max_spawnable * [{}] - if not queue_length: # queue is empty, spawn 0 actors - return actors_to_spawn - elif queue_length > 4: # queue is medium, spawn 1 or 2 actors + if queue_length > 4: # queue is medium, spawn 1 or 2 actors actors_to_spawn += min(2, max_spawnable) * [{**cls.launch_kwargs}] else: # queue is short, spawn 1 actor actors_to_spawn += min(1, max_spawnable) * [{**cls.launch_kwargs}] @@ -144,7 +146,6 @@ class ActorType(ABC, Generic[ModelType]): # return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot')) return cls.QUERYSET - ### Instance Methods: Called by Actor after it has been spawned (i.e. forked as a thread or process) def runloop(self): diff --git a/archivebox/actors/actor_crawl.py b/archivebox/actors/actor_crawl.py deleted file mode 100644 index 11d85042..00000000 --- a/archivebox/actors/actor_crawl.py +++ /dev/null @@ -1,286 +0,0 @@ -__package__ = 'archivebox.actors' - -import os -import time -from typing import ClassVar, Generic, cast, Literal, Type -from django.utils.functional import classproperty - -from rich import print -import psutil - -from django import db -from django.db.models import QuerySet -from multiprocessing import Process, cpu_count -from threading import Thread, get_native_id - -from crawls.models import Crawl - -from .actor import ActorType, LaunchKwargs - -class CrawlActor(ActorType[Crawl]): - - QUERYSET: ClassVar[QuerySet] = Crawl.objects.filter(status='queued') - CLAIM_WHERE: ClassVar[str] = 'status = "queued"' # the WHERE clause to filter the objects when atomically getting the next object from the queue - CLAIM_SET: ClassVar[str] = 'status = "started"' # the SET clause to claim the object when atomically getting the next object from the queue - CLAIM_ORDER: ClassVar[str] = 'created_at DESC' # the ORDER BY clause to sort the objects with when atomically getting the next object from the queue - CLAIM_FROM_TOP: ClassVar[int] = 50 # the number of objects to consider when atomically getting the next object from the queue - - # model_type: Type[ModelType] - MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.6)), 8) # min 2, max 8, up to 60% of available cpu cores - MAX_TICK_TIME: ClassVar[int] = 60 # maximum duration in seconds to process a single object - - _SPAWNED_ACTOR_PIDS: ClassVar[list[psutil.Process]] = [] # record all the pids of Actors spawned by this class - - def __init__(self, mode: Literal['thread', 'process']|None=None, **launch_kwargs: LaunchKwargs): - self.mode = mode or self.mode - self.launch_kwargs = launch_kwargs or dict(self.launch_kwargs) - - @classproperty - def name(cls) -> str: - return cls.__name__ # type: ignore - - def __str__(self) -> str: - return self.__repr__() - - def __repr__(self) -> str: - """FaviconActor[pid=1234]""" - label = 'pid' if self.mode == 'process' else 'tid' - return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]' - - ### Class Methods: Called by Orchestrator on ActorType class before it has been spawned - - @classmethod - def get_running_actors(cls) -> list[int]: - """returns a list of pids of all running actors of this type""" - # WARNING: only works for process actors, not thread actors - if cls.mode == 'thread': - raise NotImplementedError('get_running_actors() is not implemented for thread actors') - return [ - proc.pid for proc in cls._SPAWNED_ACTOR_PIDS - if proc.is_running() and proc.status() != 'zombie' - ] - - @classmethod - def get_actors_to_spawn(cls, queue: QuerySet, running_actors: list[int]) -> list[LaunchKwargs]: - """Get a list of launch kwargs for the number of actors to spawn based on the queue and currently running actors""" - actors_to_spawn: list[LaunchKwargs] = [] - max_spawnable = cls.MAX_CONCURRENT_ACTORS - len(running_actors) - queue_length = queue.count() - - # spawning new actors is expensive, avoid spawning all the actors at once. To stagger them, - # let the next orchestrator tick handle starting another 2 on the next tick() - # if queue_length > 10: # queue is long, spawn as many as possible - # actors_to_spawn += max_spawnable * [{}] - - if not queue_length: # queue is empty, spawn 0 actors - return actors_to_spawn - elif queue_length > 4: # queue is medium, spawn 1 or 2 actors - actors_to_spawn += min(2, max_spawnable) * [{**cls.launch_kwargs}] - else: # queue is short, spawn 1 actor - actors_to_spawn += min(1, max_spawnable) * [{**cls.launch_kwargs}] - return actors_to_spawn - - @classmethod - def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int: - if mode == 'thread': - return cls.fork_actor_as_thread(**launch_kwargs) - elif mode == 'process': - return cls.fork_actor_as_process(**launch_kwargs) - raise ValueError(f'Invalid actor mode: {mode} must be "thread" or "process"') - - @classmethod - def fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int: - """Spawn a new background thread running the actor's runloop""" - actor = cls(mode='thread', **launch_kwargs) - bg_actor_thread = Thread(target=actor.runloop) - bg_actor_thread.start() - assert bg_actor_thread.native_id is not None - return bg_actor_thread.native_id - - @classmethod - def fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int: - """Spawn a new background process running the actor's runloop""" - actor = cls(mode='process', **launch_kwargs) - bg_actor_process = Process(target=actor.runloop) - bg_actor_process.start() - assert bg_actor_process.pid is not None - cls._SPAWNED_ACTOR_PIDS.append(psutil.Process(pid=bg_actor_process.pid)) - return bg_actor_process.pid - - @classmethod - def get_model(cls) -> Type[ModelType]: - # wish this was a @classproperty but Generic[ModelType] return type cant be statically inferred for @classproperty - return cls.QUERYSET.model - - @classmethod - def get_queue(cls) -> QuerySet: - """override this to provide your queryset as the queue""" - # return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot')) - return cls.QUERYSET - - - ### Instance Methods: Called by Actor after it has been spawned (i.e. forked as a thread or process) - - def runloop(self): - """The main runloop that starts running when the actor is spawned (as subprocess or thread) and exits when the queue is empty""" - self.on_startup() - try: - while True: - obj_to_process: ModelType | None = None - try: - obj_to_process = cast(ModelType, self.get_next(atomic=self.atomic)) - except Exception: - pass - - if obj_to_process: - self.idle_count = 0 # reset idle count if we got an object - else: - if self.idle_count >= 30: - break # stop looping and exit if queue is empty and we have idled for 30sec - else: - # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') - self.idle_count += 1 - time.sleep(1) - continue - - self.on_tick_start(obj_to_process) - - # Process the object - try: - self.tick(obj_to_process) - except Exception as err: - print(f'[red]🏃‍♂️ ERROR: {self}.tick()[/red]', err) - db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions - self.on_tick_exception(obj_to_process, err) - finally: - self.on_tick_end(obj_to_process) - - self.on_shutdown(err=None) - except BaseException as err: - if isinstance(err, KeyboardInterrupt): - print() - else: - print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err) - self.on_shutdown(err=err) - - def get_next(self, atomic: bool | None=None) -> ModelType | None: - """get the next object from the queue, atomically locking it if self.atomic=True""" - if atomic is None: - atomic = self.ATOMIC - - if atomic: - # fetch and claim the next object from in the queue in one go atomically - obj = self.get_next_atomic() - else: - # two-step claim: fetch the next object and lock it in a separate query - obj = self.get_queue().last() - assert obj and self.lock_next(obj), f'Unable to fetch+lock the next {self.get_model().__name__} ojbect from {self}.QUEUE' - return obj - - def lock_next(self, obj: ModelType) -> bool: - """override this to implement a custom two-step (non-atomic)lock mechanism""" - # For example: - # assert obj._model.objects.filter(pk=obj.pk, status='queued').update(status='started', locked_by=self.pid) - # Not needed if using get_next_and_lock() to claim the object atomically - # print(f'[blue]🏃‍♂️ {self}.lock()[/blue]', obj.abid or obj.id) - return True - - def claim_sql_where(self) -> str: - """override this to implement a custom WHERE clause for the atomic claim step e.g. "status = 'queued' AND locked_by = NULL" """ - return self.CLAIM_WHERE - - def claim_sql_set(self) -> str: - """override this to implement a custom SET clause for the atomic claim step e.g. "status = 'started' AND locked_by = {self.pid}" """ - return self.CLAIM_SET - - def claim_sql_order(self) -> str: - """override this to implement a custom ORDER BY clause for the atomic claim step e.g. "created_at DESC" """ - return self.CLAIM_ORDER - - def claim_from_top(self) -> int: - """override this to implement a custom number of objects to consider when atomically claiming the next object from the top of the queue""" - return self.CLAIM_FROM_TOP - - def get_next_atomic(self, shallow: bool=True) -> ModelType | None: - """ - claim a random object from the top n=50 objects in the queue (atomically updates status=queued->started for claimed object) - optimized for minimizing contention on the queue with other actors selecting from the same list - slightly faster than claim_any_obj() which selects randomly from the entire queue but needs to know the total count - """ - Model = self.get_model() # e.g. ArchiveResult - table = f'{Model._meta.app_label}_{Model._meta.model_name}' # e.g. core_archiveresult - - where_sql = self.claim_sql_where() - set_sql = self.claim_sql_set() - order_by_sql = self.claim_sql_order() - choose_from_top = self.claim_from_top() - - with db.connection.cursor() as cursor: - # subquery gets the pool of the top 50 candidates sorted by sort and order - # main query selects a random one from that pool - cursor.execute(f""" - UPDATE {table} - SET {set_sql} - WHERE {where_sql} and id = ( - SELECT id FROM ( - SELECT id FROM {table} - WHERE {where_sql} - ORDER BY {order_by_sql} - LIMIT {choose_from_top} - ) candidates - ORDER BY RANDOM() - LIMIT 1 - ) - RETURNING id; - """) - result = cursor.fetchone() - - if result is None: - return None # If no rows were claimed, return None - - if shallow: - # shallow: faster, returns potentially incomplete object instance missing some django auto-populated fields: - columns = [col[0] for col in cursor.description or ['id']] - return Model(**dict(zip(columns, result))) - - # if not shallow do one extra query to get a more complete object instance (load it fully from scratch) - return Model.objects.get(id=result[0]) - - @abstractmethod - def tick(self, obj: ModelType) -> None: - """override this to process the object""" - print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', obj.abid or obj.id) - # For example: - # do_some_task(obj) - # do_something_else(obj) - # obj._model.objects.filter(pk=obj.pk, status='started').update(status='success') - raise NotImplementedError('tick() must be implemented by the Actor subclass') - - def on_startup(self) -> None: - if self.mode == 'thread': - self.pid = get_native_id() # thread id - print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') - else: - self.pid = os.getpid() # process id - print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]') - # abx.pm.hook.on_actor_startup(self) - - def on_shutdown(self, err: BaseException | None=None) -> None: - print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]') - # abx.pm.hook.on_actor_shutdown(self) - - def on_tick_start(self, obj: ModelType) -> None: - # print(f'🏃‍♂️ {self}.on_tick_start()', obj.abid or obj.id) - # abx.pm.hook.on_actor_tick_start(self, obj_to_process) - # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') - pass - - def on_tick_end(self, obj: ModelType) -> None: - # print(f'🏃‍♂️ {self}.on_tick_end()', obj.abid or obj.id) - # abx.pm.hook.on_actor_tick_end(self, obj_to_process) - # self.timer.end() - pass - - def on_tick_exception(self, obj: ModelType, err: BaseException) -> None: - print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', obj.abid or obj.id, err) - # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index c7fed888..df4c860b 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -3,8 +3,7 @@ __package__ = 'archivebox.actors' import os import time import itertools -import uuid -from typing import Dict, Type, Literal +from typing import Dict, Type, Literal, ClassVar from django.utils.functional import classproperty from multiprocessing import Process, cpu_count @@ -173,54 +172,36 @@ from django import db from django.db import connection +from crawls.actors import CrawlActor +from .actor_snapshot import SnapshotActor + +from abx_plugin_singlefile.actors import SinglefileActor class FaviconActor(ActorType[ArchiveResult]): - @classmethod - def get_queue(cls) -> QuerySet[ArchiveResult]: + CLAIM_ORDER: ClassVar[str] = 'created_at DESC' + CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"' + CLAIM_SET: ClassVar[str] = 'status = "started"' + + @classproperty + def QUERYSET(cls) -> QuerySet: return ArchiveResult.objects.filter(status='failed', extractor='favicon') - - @classmethod - def get_next(cls) -> ArchiveResult | None: - # return cls.get_next_atomic( - # model=ArchiveResult, - # where='status = "failed"', - # set='status = "started"', - # order_by='created_at DESC', - # choose_from_top=cpu_count() * 10, - # ) - return cls.get_random( - model=ArchiveResult, - where='status = "failed" AND extractor = "favicon"', - set='status = "queued"', - choose_from_top=50, - ) - + def tick(self, obj: ArchiveResult): print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count()) updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 if not updated: raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') - # obj.refresh_from_db() - obj.status = 'success' - - def lock(self, obj: ArchiveResult) -> bool: - """As an alternative to self.get_next_atomic(), we can use select_for_update() or manually update a semaphore field here""" - - locked = ArchiveResult.objects.filter(id=obj.id, status='queued').update(status='started') == 1 - if locked: - # obj.refresh_from_db() - obj.status = 'started' - # print(f'FaviconActor[{self.pid}] lock({obj.id}) 🔒') - pass - else: - print(f'FaviconActor[{self.pid}] lock({obj.id}) X') - return locked + obj.refresh_from_db() + obj.save() class ExtractorsOrchestrator(Orchestrator): actor_types = { + 'CrawlActor': CrawlActor, + 'SnapshotActor': SnapshotActor, 'FaviconActor': FaviconActor, + 'SinglefileActor': SinglefileActor, } diff --git a/archivebox/actors/statemachine.py b/archivebox/actors/statemachine.py new file mode 100644 index 00000000..53883120 --- /dev/null +++ b/archivebox/actors/statemachine.py @@ -0,0 +1,286 @@ +from statemachine import State, StateMachine +from django.db import models +from multiprocessing import Process +import psutil +import time + +# State Machine Definitions +################################################# + +class SnapshotMachine(StateMachine): + """State machine for managing Snapshot lifecycle.""" + + # States + queued = State(initial=True) + started = State() + sealed = State(final=True) + + # Transitions + start = queued.to(started, cond='can_start') + seal = started.to(sealed, cond='is_finished') + + # Events + tick = ( + queued.to.itself(unless='can_start') | + queued.to(started, cond='can_start') | + started.to.itself(unless='is_finished') | + started.to(sealed, cond='is_finished') + ) + + def __init__(self, snapshot): + self.snapshot = snapshot + super().__init__() + + def can_start(self): + return True + + def is_finished(self): + return not self.snapshot.has_pending_archiveresults() + + def before_start(self): + """Pre-start validation and setup.""" + self.snapshot.cleanup_dir() + + def after_start(self): + """Post-start side effects.""" + self.snapshot.create_pending_archiveresults() + self.snapshot.update_indices() + self.snapshot.bump_retry_at(seconds=10) + + def before_seal(self): + """Pre-seal validation and cleanup.""" + self.snapshot.cleanup_dir() + + def after_seal(self): + """Post-seal actions.""" + self.snapshot.update_indices() + self.snapshot.seal_dir() + self.snapshot.upload_dir() + self.snapshot.retry_at = None + self.snapshot.save() + + +class ArchiveResultMachine(StateMachine): + """State machine for managing ArchiveResult lifecycle.""" + + # States + queued = State(initial=True) + started = State() + succeeded = State(final=True) + backoff = State() + failed = State(final=True) + + # Transitions + start = queued.to(started, cond='can_start') + succeed = started.to(succeeded, cond='extractor_succeeded') + backoff = started.to(backoff, unless='extractor_succeeded') + retry = backoff.to(queued, cond='can_retry') + fail = backoff.to(failed, unless='can_retry') + + # Events + tick = ( + queued.to.itself(unless='can_start') | + queued.to(started, cond='can_start') | + started.to.itself(cond='extractor_still_running') | + started.to(succeeded, cond='extractor_succeeded') | + started.to(backoff, unless='extractor_succeeded') | + backoff.to.itself(cond='still_waiting_to_retry') | + backoff.to(queued, cond='can_retry') | + backoff.to(failed, unless='can_retry') + ) + + def __init__(self, archiveresult): + self.archiveresult = archiveresult + super().__init__() + + def can_start(self): + return True + + def extractor_still_running(self): + return self.archiveresult.start_ts > time.now() - timedelta(seconds=5) + + def extractor_succeeded(self): + # return check_if_extractor_succeeded(self.archiveresult) + return self.archiveresult.start_ts < time.now() - timedelta(seconds=5) + + def can_retry(self): + return self.archiveresult.retries < self.archiveresult.max_retries + + def before_start(self): + """Pre-start initialization.""" + self.archiveresult.retries += 1 + self.archiveresult.start_ts = time.now() + self.archiveresult.output = None + self.archiveresult.error = None + + def after_start(self): + """Post-start execution.""" + self.archiveresult.bump_retry_at(seconds=self.archiveresult.timeout + 5) + execute_extractor(self.archiveresult) + self.archiveresult.snapshot.bump_retry_at(seconds=5) + + def before_succeed(self): + """Pre-success validation.""" + self.archiveresult.output = get_archiveresult_output(self.archiveresult) + + def after_succeed(self): + """Post-success cleanup.""" + self.archiveresult.end_ts = time.now() + self.archiveresult.retry_at = None + self.archiveresult.update_indices() + + def before_backoff(self): + """Pre-backoff error capture.""" + self.archiveresult.error = get_archiveresult_error(self.archiveresult) + + def after_backoff(self): + """Post-backoff retry scheduling.""" + self.archiveresult.end_ts = time.now() + self.archiveresult.bump_retry_at( + seconds=self.archiveresult.timeout * self.archiveresult.retries + ) + self.archiveresult.update_indices() + + def before_fail(self): + """Pre-failure finalization.""" + self.archiveresult.retry_at = None + + def after_fail(self): + """Post-failure cleanup.""" + self.archiveresult.update_indices() + +# Models +################################################# + +class Snapshot(models.Model): + status = models.CharField(max_length=32, default='queued') + retry_at = models.DateTimeField(null=True) + + @property + def sm(self): + """Get the state machine for this snapshot.""" + return SnapshotMachine(self) + + def has_pending_archiveresults(self): + return self.archiveresult_set.exclude( + status__in=['succeeded', 'failed'] + ).exists() + + def bump_retry_at(self, seconds): + self.retry_at = time.now() + timedelta(seconds=seconds) + self.save() + + def cleanup_dir(self): + cleanup_snapshot_dir(self) + + def create_pending_archiveresults(self): + create_snapshot_pending_archiveresults(self) + + def update_indices(self): + update_snapshot_index_json(self) + update_snapshot_index_html(self) + + def seal_dir(self): + seal_snapshot_dir(self) + + def upload_dir(self): + upload_snapshot_dir(self) + + +class ArchiveResult(models.Model): + snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE) + status = models.CharField(max_length=32, default='queued') + retry_at = models.DateTimeField(null=True) + retries = models.IntegerField(default=0) + max_retries = models.IntegerField(default=3) + timeout = models.IntegerField(default=60) + start_ts = models.DateTimeField(null=True) + end_ts = models.DateTimeField(null=True) + output = models.TextField(null=True) + error = models.TextField(null=True) + + def get_machine(self): + return ArchiveResultMachine(self) + + def bump_retry_at(self, seconds): + self.retry_at = time.now() + timedelta(seconds=seconds) + self.save() + + def update_indices(self): + update_archiveresult_index_json(self) + update_archiveresult_index_html(self) + + +# Actor System +################################################# + +class BaseActor: + MAX_TICK_TIME = 60 + + def tick(self, obj): + """Process a single object through its state machine.""" + machine = obj.get_machine() + + if machine.is_queued: + if machine.can_start(): + machine.start() + + elif machine.is_started: + if machine.can_seal(): + machine.seal() + + elif machine.is_backoff: + if machine.can_retry(): + machine.retry() + else: + machine.fail() + + +class Orchestrator: + """Main orchestrator that manages all actors.""" + + def __init__(self): + self.pid = None + + @classmethod + def spawn(cls): + orchestrator = cls() + proc = Process(target=orchestrator.runloop) + proc.start() + return proc.pid + + def runloop(self): + self.pid = os.getpid() + abx.pm.hook.on_orchestrator_startup(self) + + try: + while True: + self.process_queue(Snapshot) + self.process_queue(ArchiveResult) + time.sleep(0.1) + + except (KeyboardInterrupt, SystemExit): + abx.pm.hook.on_orchestrator_shutdown(self) + + def process_queue(self, model): + retry_at_reached = Q(retry_at__isnull=True) | Q(retry_at__lte=time.now()) + queue = model.objects.filter(retry_at_reached) + + if queue.exists(): + actor = BaseActor() + for obj in queue: + try: + with transaction.atomic(): + actor.tick(obj) + except Exception as e: + abx.pm.hook.on_actor_tick_exception(actor, obj, e) + + +# Periodic Tasks +################################################# + +@djhuey.periodic_task(schedule=djhuey.crontab(minute='*')) +def ensure_orchestrator_running(): + """Ensure orchestrator is running, start if not.""" + if not any(p.name().startswith('Orchestrator') for p in psutil.process_iter()): + Orchestrator.spawn() diff --git a/archivebox/core/actors.py b/archivebox/core/actors.py new file mode 100644 index 00000000..30b8245f --- /dev/null +++ b/archivebox/core/actors.py @@ -0,0 +1,73 @@ +__package__ = 'archivebox.core' + +from typing import ClassVar + +from rich import print + +from django.db.models import QuerySet +from django.utils import timezone +from datetime import timedelta +from core.models import Snapshot + +from actors.actor import ActorType + + +class SnapshotActor(ActorType[Snapshot]): + + QUERYSET: ClassVar[QuerySet] = Snapshot.objects.filter(status='queued') + CLAIM_WHERE: ClassVar[str] = 'status = "queued"' # the WHERE clause to filter the objects when atomically getting the next object from the queue + CLAIM_SET: ClassVar[str] = 'status = "started"' # the SET clause to claim the object when atomically getting the next object from the queue + CLAIM_ORDER: ClassVar[str] = 'created_at DESC' # the ORDER BY clause to sort the objects with when atomically getting the next object from the queue + CLAIM_FROM_TOP: ClassVar[int] = 50 # the number of objects to consider when atomically getting the next object from the queue + + # model_type: Type[ModelType] + MAX_CONCURRENT_ACTORS: ClassVar[int] = 4 # min 2, max 8, up to 60% of available cpu cores + MAX_TICK_TIME: ClassVar[int] = 60 # maximum duration in seconds to process a single object + + def claim_sql_where(self) -> str: + """override this to implement a custom WHERE clause for the atomic claim step e.g. "status = 'queued' AND locked_by = NULL" """ + return self.CLAIM_WHERE + + def claim_sql_set(self) -> str: + """override this to implement a custom SET clause for the atomic claim step e.g. "status = 'started' AND locked_by = {self.pid}" """ + retry_at = timezone.now() + timedelta(seconds=self.MAX_TICK_TIME) + # format as 2024-10-31 10:14:33.240903 + retry_at_str = retry_at.strftime('%Y-%m-%d %H:%M:%S.%f') + return f'{self.CLAIM_SET}, retry_at = {retry_at_str}' + + def claim_sql_order(self) -> str: + """override this to implement a custom ORDER BY clause for the atomic claim step e.g. "created_at DESC" """ + return self.CLAIM_ORDER + + def claim_from_top(self) -> int: + """override this to implement a custom number of objects to consider when atomically claiming the next object from the top of the queue""" + return self.CLAIM_FROM_TOP + + def tick(self, obj: Snapshot) -> None: + """override this to process the object""" + print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', obj.abid or obj.id) + # For example: + # do_some_task(obj) + # do_something_else(obj) + # obj._model.objects.filter(pk=obj.pk, status='started').update(status='success') + # raise NotImplementedError('tick() must be implemented by the Actor subclass') + + def on_shutdown(self, err: BaseException | None=None) -> None: + print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]') + # abx.pm.hook.on_actor_shutdown(self) + + def on_tick_start(self, obj: Snapshot) -> None: + # print(f'🏃‍♂️ {self}.on_tick_start()', obj.abid or obj.id) + # abx.pm.hook.on_actor_tick_start(self, obj_to_process) + # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') + pass + + def on_tick_end(self, obj: Snapshot) -> None: + # print(f'🏃‍♂️ {self}.on_tick_end()', obj.abid or obj.id) + # abx.pm.hook.on_actor_tick_end(self, obj_to_process) + # self.timer.end() + pass + + def on_tick_exception(self, obj: Snapshot, err: BaseException) -> None: + print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', obj.abid or obj.id, err) + # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) diff --git a/archivebox/core/models.py b/archivebox/core/models.py index 79776b7f..a3962a6a 100644 --- a/archivebox/core/models.py +++ b/archivebox/core/models.py @@ -8,21 +8,25 @@ import os import json from pathlib import Path +from datetime import timedelta from django.db import models from django.utils.functional import cached_property from django.utils.text import slugify +from django.utils import timezone from django.core.cache import cache from django.urls import reverse, reverse_lazy from django.db.models import Case, When, Value, IntegerField from django.contrib import admin from django.conf import settings +from statemachine.mixins import MachineMixin + from archivebox.config import CONSTANTS from abid_utils.models import ABIDModel, ABIDField, AutoDateTimeField from queues.tasks import bg_archive_snapshot -# from crawls.models import Crawl +from crawls.models import Crawl # from machine.models import Machine, NetworkInterface from archivebox.misc.system import get_dir_size @@ -152,7 +156,7 @@ class SnapshotManager(models.Manager): return super().get_queryset().prefetch_related('tags', 'archiveresult_set') # .annotate(archiveresult_count=models.Count('archiveresult')).distinct() -class Snapshot(ABIDModel): +class Snapshot(ABIDModel, MachineMixin): abid_prefix = 'snp_' abid_ts_src = 'self.created_at' abid_uri_src = 'self.url' @@ -160,6 +164,17 @@ class Snapshot(ABIDModel): abid_rand_src = 'self.id' abid_drift_allowed = True + state_field_name = 'status' + state_machine_name = 'core.statemachines.SnapshotMachine' + state_machine_attr = 'sm' + + class SnapshotStatus(models.TextChoices): + QUEUED = 'queued', 'Queued' + STARTED = 'started', 'Started' + SEALED = 'sealed', 'Sealed' + + status = models.CharField(max_length=15, default=SnapshotStatus.QUEUED, null=False, blank=False) + id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID') abid = ABIDField(prefix=abid_prefix) @@ -171,7 +186,7 @@ class Snapshot(ABIDModel): bookmarked_at = AutoDateTimeField(default=None, null=False, editable=True, db_index=True) downloaded_at = models.DateTimeField(default=None, null=True, editable=False, db_index=True, blank=True) - # crawl = models.ForeignKey(Crawl, on_delete=models.CASCADE, default=None, null=True, blank=True, related_name='snapshot_set') + crawl = models.ForeignKey(Crawl, on_delete=models.CASCADE, default=None, null=True, blank=True, related_name='snapshot_set') url = models.URLField(unique=True, db_index=True) timestamp = models.CharField(max_length=32, unique=True, db_index=True, editable=False) @@ -396,6 +411,25 @@ class Snapshot(ABIDModel): tags_id.append(Tag.objects.get_or_create(name=tag)[0].pk) self.tags.clear() self.tags.add(*tags_id) + + def has_pending_archiveresults(self) -> bool: + pending_statuses = [ArchiveResult.ArchiveResultStatus.QUEUED, ArchiveResult.ArchiveResultStatus.STARTED] + pending_archiveresults = self.archiveresult_set.filter(status__in=pending_statuses) + return pending_archiveresults.exists() + + def create_pending_archiveresults(self) -> list['ArchiveResult']: + archiveresults = [] + for extractor in EXTRACTORS: + archiveresult, _created = ArchiveResult.objects.get_or_create( + snapshot=self, + extractor=extractor, + status=ArchiveResult.ArchiveResultStatus.QUEUED, + ) + archiveresults.append(archiveresult) + return archiveresults + + def bump_retry_at(self, seconds: int = 10): + self.retry_at = timezone.now() + timedelta(seconds=seconds) # def get_storage_dir(self, create=True, symlink=True) -> Path: @@ -452,6 +486,20 @@ class ArchiveResult(ABIDModel): abid_subtype_src = 'self.extractor' abid_rand_src = 'self.id' abid_drift_allowed = True + + state_field_name = 'status' + state_machine_name = 'core.statemachines.ArchiveResultMachine' + state_machine_attr = 'sm' + + class ArchiveResultStatus(models.TextChoices): + QUEUED = 'queued', 'Queued' + STARTED = 'started', 'Started' + SUCCEEDED = 'succeeded', 'Succeeded' + FAILED = 'failed', 'Failed' + SKIPPED = 'skipped', 'Skipped' + BACKOFF = 'backoff', 'Waiting to retry' + + status = models.CharField(max_length=15, choices=ArchiveResultStatus.choices, default=ArchiveResultStatus.QUEUED, null=False, blank=False) EXTRACTOR_CHOICES = ( ('htmltotext', 'htmltotext'), @@ -469,11 +517,7 @@ class ArchiveResult(ABIDModel): ('title', 'title'), ('wget', 'wget'), ) - STATUS_CHOICES = [ - ("succeeded", "succeeded"), - ("failed", "failed"), - ("skipped", "skipped") - ] + id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID') abid = ABIDField(prefix=abid_prefix) @@ -491,7 +535,6 @@ class ArchiveResult(ABIDModel): output = models.CharField(max_length=1024) start_ts = models.DateTimeField(db_index=True) end_ts = models.DateTimeField() - status = models.CharField(max_length=16, choices=STATUS_CHOICES) # the network interface that was used to download this result # uplink = models.ForeignKey(NetworkInterface, on_delete=models.SET_NULL, null=True, blank=True, verbose_name='Network Interface Used') @@ -552,7 +595,15 @@ class ArchiveResult(ABIDModel): return link.canonical_outputs().get(f'{self.extractor}_path') def output_exists(self) -> bool: - return os.access(self.output_path(), os.R_OK) + return os.path.exists(self.output_path()) + + def bump_retry_at(self, seconds: int = 10): + self.retry_at = timezone.now() + timedelta(seconds=seconds) + + def create_output_dir(self): + snap_dir = self.snapshot_dir + snap_dir.mkdir(parents=True, exist_ok=True) + return snap_dir / self.output_path() # def get_storage_dir(self, create=True, symlink=True): diff --git a/archivebox/core/settings.py b/archivebox/core/settings.py index cdcf867f..e7d673ac 100644 --- a/archivebox/core/settings.py +++ b/archivebox/core/settings.py @@ -64,7 +64,8 @@ INSTALLED_APPS = [ # 'abid_utils', # handles ABID ID creation, handling, and models 'config', # ArchiveBox config settings (loaded as a plugin, don't need to add it here) 'machine', # handles collecting and storing information about the host machine, network interfaces, installed binaries, etc. - 'queues', # handles starting and managing background workers and processes + 'actors', # handles starting and managing background workers and processes (orchestrators and actors) + 'queues', # handles starting and managing background workers and processes (supervisord) 'seeds', # handles Seed model and URL source management 'crawls', # handles Crawl and CrawlSchedule models and management 'personas', # handles Persona and session management diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py new file mode 100644 index 00000000..a2425d43 --- /dev/null +++ b/archivebox/core/statemachines.py @@ -0,0 +1,115 @@ +__package__ = 'archivebox.snapshots' + +from django.utils import timezone + +from statemachine import State, StateMachine + +from core.models import Snapshot, ArchiveResult + +# State Machine Definitions +################################################# + + +class SnapshotMachine(StateMachine, strict_states=True): + """State machine for managing Snapshot lifecycle.""" + + model: Snapshot + + # States + queued = State(value=Snapshot.SnapshotStatus.QUEUED, initial=True) + started = State(value=Snapshot.SnapshotStatus.STARTED) + sealed = State(value=Snapshot.SnapshotStatus.SEALED, final=True) + + # Tick Event + tick = ( + queued.to.itself(unless='can_start', internal=True) | + queued.to(started, cond='can_start') | + started.to.itself(unless='is_finished', internal=True) | + started.to(sealed, cond='is_finished') + ) + + def __init__(self, snapshot, *args, **kwargs): + self.snapshot = snapshot + super().__init__(snapshot, *args, **kwargs) + + def can_start(self) -> bool: + return self.snapshot.seed and self.snapshot.seed.uri + + def is_finished(self) -> bool: + return not self.snapshot.has_pending_archiveresults() + + def on_started(self): + self.snapshot.create_pending_archiveresults() + self.snapshot.bump_retry_at(seconds=60) + self.snapshot.save() + + def on_sealed(self): + self.snapshot.retry_at = None + self.snapshot.save() + +class ArchiveResultMachine(StateMachine, strict_states=True): + """State machine for managing ArchiveResult lifecycle.""" + + model: ArchiveResult + + # States + queued = State(value=ArchiveResult.ArchiveResultStatus.QUEUED, initial=True) + started = State(value=ArchiveResult.ArchiveResultStatus.STARTED) + backoff = State(value=ArchiveResult.ArchiveResultStatus.BACKOFF) + succeeded = State(value=ArchiveResult.ArchiveResultStatus.SUCCEEDED, final=True) + failed = State(value=ArchiveResult.ArchiveResultStatus.FAILED, final=True) + + # Tick Event + tick = ( + queued.to.itself(unless='can_start', internal=True) | + queued.to(started, cond='can_start') | + started.to.itself(unless='is_finished', internal=True) | + started.to(succeeded, cond='is_succeeded') | + started.to(failed, cond='is_failed') | + started.to(backoff, cond='is_backoff') | + backoff.to.itself(unless='can_start', internal=True) | + backoff.to(started, cond='can_start') | + backoff.to(succeeded, cond='is_succeeded') | + backoff.to(failed, cond='is_failed') + ) + + def __init__(self, archiveresult, *args, **kwargs): + self.archiveresult = archiveresult + super().__init__(archiveresult, *args, **kwargs) + + def can_start(self) -> bool: + return self.archiveresult.snapshot and self.archiveresult.snapshot.is_started() + + def is_succeeded(self) -> bool: + return self.archiveresult.output_exists() + + def is_failed(self) -> bool: + return not self.archiveresult.output_exists() + + def is_backoff(self) -> bool: + return self.archiveresult.status == ArchiveResult.ArchiveResultStatus.BACKOFF + + def on_started(self): + self.archiveresult.start_ts = timezone.now() + self.archiveresult.create_output_dir() + self.archiveresult.bump_retry_at(seconds=60) + self.archiveresult.save() + + def on_backoff(self): + self.archiveresult.bump_retry_at(seconds=60) + self.archiveresult.save() + + def on_succeeded(self): + self.archiveresult.end_ts = timezone.now() + self.archiveresult.save() + + def on_failed(self): + self.archiveresult.end_ts = timezone.now() + self.archiveresult.save() + + def after_transition(self, event: str, source: State, target: State): + print(f"after '{event}' from '{source.id}' to '{target.id}'") + # self.archiveresult.save_merkle_index() + # self.archiveresult.save_html_index() + # self.archiveresult.save_json_index() + return "after_transition" diff --git a/archivebox/crawls/actors.py b/archivebox/crawls/actors.py new file mode 100644 index 00000000..f159956e --- /dev/null +++ b/archivebox/crawls/actors.py @@ -0,0 +1,69 @@ +__package__ = 'archivebox.crawls' + +from typing import ClassVar + +from rich import print + +from django.db.models import QuerySet + +from crawls.models import Crawl + +from actors.actor import ActorType + + +class CrawlActor(ActorType[Crawl]): + + QUERYSET: ClassVar[QuerySet] = Crawl.objects.filter(status='queued') + CLAIM_WHERE: ClassVar[str] = 'status = "queued"' # the WHERE clause to filter the objects when atomically getting the next object from the queue + CLAIM_SET: ClassVar[str] = 'status = "started"' # the SET clause to claim the object when atomically getting the next object from the queue + CLAIM_ORDER: ClassVar[str] = 'created_at DESC' # the ORDER BY clause to sort the objects with when atomically getting the next object from the queue + CLAIM_FROM_TOP: ClassVar[int] = 50 # the number of objects to consider when atomically getting the next object from the queue + + # model_type: Type[ModelType] + MAX_CONCURRENT_ACTORS: ClassVar[int] = 4 # min 2, max 8, up to 60% of available cpu cores + MAX_TICK_TIME: ClassVar[int] = 60 # maximum duration in seconds to process a single object + + def claim_sql_where(self) -> str: + """override this to implement a custom WHERE clause for the atomic claim step e.g. "status = 'queued' AND locked_by = NULL" """ + return self.CLAIM_WHERE + + def claim_sql_set(self) -> str: + """override this to implement a custom SET clause for the atomic claim step e.g. "status = 'started' AND locked_by = {self.pid}" """ + return self.CLAIM_SET + + def claim_sql_order(self) -> str: + """override this to implement a custom ORDER BY clause for the atomic claim step e.g. "created_at DESC" """ + return self.CLAIM_ORDER + + def claim_from_top(self) -> int: + """override this to implement a custom number of objects to consider when atomically claiming the next object from the top of the queue""" + return self.CLAIM_FROM_TOP + + def tick(self, obj: Crawl) -> None: + """override this to process the object""" + print(f'[blue]🏃‍♂️ {self}.tick()[/blue]', obj.abid or obj.id) + # For example: + # do_some_task(obj) + # do_something_else(obj) + # obj._model.objects.filter(pk=obj.pk, status='started').update(status='success') + # raise NotImplementedError('tick() must be implemented by the Actor subclass') + + def on_shutdown(self, err: BaseException | None=None) -> None: + print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53]', err or '[green](gracefully)[/green]') + # abx.pm.hook.on_actor_shutdown(self) + + def on_tick_start(self, obj: Crawl) -> None: + # print(f'🏃‍♂️ {self}.on_tick_start()', obj.abid or obj.id) + # abx.pm.hook.on_actor_tick_start(self, obj_to_process) + # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') + pass + + def on_tick_end(self, obj: Crawl) -> None: + # print(f'🏃‍♂️ {self}.on_tick_end()', obj.abid or obj.id) + # abx.pm.hook.on_actor_tick_end(self, obj_to_process) + # self.timer.end() + pass + + def on_tick_exception(self, obj: Crawl, err: BaseException) -> None: + print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red]', obj.abid or obj.id, err) + # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index a806d889..ff9e0d0a 100644 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -1,13 +1,20 @@ __package__ = 'archivebox.crawls' +from typing import TYPE_CHECKING from django_stubs_ext.db.models import TypedModelMeta +from datetime import timedelta + from django.db import models -from django.db.models import Q from django.core.validators import MaxValueValidator, MinValueValidator from django.conf import settings -from django.utils import timezone from django.urls import reverse_lazy +from django.utils import timezone + +from statemachine.mixins import MachineMixin + +if TYPE_CHECKING: + from core.models import Snapshot from seeds.models import Seed @@ -41,8 +48,9 @@ class CrawlSchedule(ABIDModel, ModelWithHealthStats): """The base crawl that each new scheduled job should copy as a template""" return self.crawl_set.first() + -class Crawl(ABIDModel, ModelWithHealthStats): +class Crawl(ABIDModel, ModelWithHealthStats, MachineMixin): """ A single session of URLs to archive starting from a given Seed and expanding outwards. An "archiving session" so to speak. @@ -55,16 +63,29 @@ class Crawl(ABIDModel, ModelWithHealthStats): abid_prefix = 'crl_' abid_ts_src = 'self.created_at' abid_uri_src = 'self.seed.uri' - abid_subtype_src = 'self.persona_id' + abid_subtype_src = 'self.persona' abid_rand_src = 'self.id' abid_drift_allowed = True + + state_field_name = 'status' + state_machine_name = 'crawls.statemachines.CrawlMachine' + state_machine_attr = 'sm' + bind_events_as_methods = True + class CrawlStatus(models.TextChoices): + QUEUED = 'queued', 'Queued' + STARTED = 'started', 'Started' + SEALED = 'sealed', 'Sealed' + + status = models.CharField(choices=CrawlStatus.choices, max_length=15, default=CrawlStatus.QUEUED, null=False, blank=False) + id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID') abid = ABIDField(prefix=abid_prefix) created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False, related_name='crawl_set') created_at = AutoDateTimeField(default=None, null=False, db_index=True) modified_at = models.DateTimeField(auto_now=True) + seed = models.ForeignKey(Seed, on_delete=models.PROTECT, related_name='crawl_set', null=False, blank=False) max_depth = models.PositiveSmallIntegerField(default=0, validators=[MinValueValidator(0), MaxValueValidator(4)]) @@ -79,7 +100,7 @@ class Crawl(ABIDModel, ModelWithHealthStats): # schedule = models.JSONField() # config = models.JSONField() - # snapshot_set: models.Manager['Snapshot'] + snapshot_set: models.Manager['Snapshot'] class Meta(TypedModelMeta): @@ -102,6 +123,28 @@ class Crawl(ABIDModel, ModelWithHealthStats): @property def api_docs_url(self) -> str: return '/api/v1/docs#/Core%20Models/api_v1_core_get_crawl' + + def has_pending_archiveresults(self) -> bool: + from core.models import ArchiveResult + + pending_statuses = [ArchiveResult.ArchiveResultStatus.QUEUED, ArchiveResult.ArchiveResultStatus.STARTED] + + snapshot_ids = self.snapshot_set.values_list('id', flat=True) + pending_archiveresults = ArchiveResult.objects.filter(snapshot_id__in=snapshot_ids, status__in=pending_statuses) + return pending_archiveresults.exists() + + def create_root_snapshot(self) -> 'Snapshot': + from core.models import Snapshot + + root_snapshot, _ = Snapshot.objects.get_or_create( + crawl=self, + url=self.seed.uri, + ) + return root_snapshot + + def bump_retry_at(self, seconds: int = 10): + self.retry_at = timezone.now() + timedelta(seconds=seconds) + self.save() class Outlink(models.Model): diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py new file mode 100644 index 00000000..b7e43daf --- /dev/null +++ b/archivebox/crawls/statemachines.py @@ -0,0 +1,48 @@ +__package__ = 'archivebox.crawls' + +from statemachine import State, StateMachine + +from crawls.models import Crawl + +# State Machine Definitions +################################################# + + +class CrawlMachine(StateMachine, strict_states=True): + """State machine for managing Crawl lifecycle.""" + + model: Crawl + + # States + queued = State(value=Crawl.CrawlStatus.QUEUED, initial=True) + started = State(value=Crawl.CrawlStatus.STARTED) + sealed = State(value=Crawl.CrawlStatus.SEALED, final=True) + + # Tick Event + tick = ( + queued.to.itself(unless='can_start', internal=True) | + queued.to(started, cond='can_start') | + started.to.itself(unless='is_finished', internal=True) | + started.to(sealed, cond='is_finished') + ) + + def __init__(self, crawl, *args, **kwargs): + self.crawl = crawl + super().__init__(crawl, *args, **kwargs) + + def can_start(self) -> bool: + return self.crawl.seed and self.crawl.seed.uri + + def is_finished(self) -> bool: + return not self.crawl.has_pending_archiveresults() + + + + def on_started(self): + self.crawl.create_root_snapshot() + self.crawl.bump_retry_at(seconds=10) + self.crawl.save() + + def on_sealed(self): + self.crawl.retry_at = None + self.crawl.save() diff --git a/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/__init__.py b/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/__init__.py index ddfb4236..be6dcd02 100644 --- a/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/__init__.py +++ b/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/__init__.py @@ -29,7 +29,7 @@ def get_EXTRACTORS(): 'singlefile': SINGLEFILE_EXTRACTOR, } -# @abx.hookimpl -# def get_INSTALLED_APPS(): -# # needed to load ./models.py -# return [__package__] +@abx.hookimpl +def get_INSTALLED_APPS(): + # needed to load ./models.py + return [__package__] diff --git a/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/actors.py b/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/actors.py new file mode 100644 index 00000000..d928d0fd --- /dev/null +++ b/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/actors.py @@ -0,0 +1,27 @@ +__package__ = 'abx_plugin_singlefile' + +from typing import ClassVar +from django.db.models import QuerySet +from django.utils.functional import classproperty + +from actors.actor import ActorType + +from .models import SinglefileResult + + +class SinglefileActor(ActorType[SinglefileResult]): + CLAIM_ORDER: ClassVar[str] = 'created_at DESC' + CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"' + CLAIM_SET: ClassVar[str] = 'status = "started"' + + @classproperty + def QUERYSET(cls) -> QuerySet: + return SinglefileResult.objects.filter(status='queued') + + def tick(self, obj: SinglefileResult): + print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count()) + updated = SinglefileResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 + if not updated: + raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') + obj.refresh_from_db() + obj.save() diff --git a/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/migrations/__init__.py b/archivebox/pkgs/abx-plugin-singlefile/abx_plugin_singlefile/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archivebox/pkgs/abx-spec-archivebox/abx_spec_archivebox/states.py b/archivebox/pkgs/abx-spec-archivebox/abx_spec_archivebox/states.py index a56649da..05284f37 100644 --- a/archivebox/pkgs/abx-spec-archivebox/abx_spec_archivebox/states.py +++ b/archivebox/pkgs/abx-spec-archivebox/abx_spec_archivebox/states.py @@ -20,6 +20,17 @@ from django.urls import reverse_lazy from pathlib import Path +# Glossary: +# - startup: when a new process is spawned +# - shutdown: when a process is exiting +# - start: at the beginning of some python code block +# - end: at the end of some python code block +# - queue: a django queryset of objects of a single type that are waiting to be processed +# - actor: a long-running daemon process that wakes up and processes a single object from a queue at a time +# - plugin: a python package that defines some hookimpls based on hookspecs exposed by ABX +# - object: an instance of a django model that represents a single row in the database + + # ORCHESTRATOR: # An orchestrator is a single long-running daemon process that manages spawning and killing actors for different queues of objects. # The orchestrator first starts when the archivebox starts, and it stops when archivebox is killed. @@ -74,8 +85,8 @@ from pathlib import Path # On startup an actor should fire abx.pm.hook.on_actor_startup(object) and on exit it should fire abx.pm.hook.on_actor_exit(object) (both syncronous hooks that can be used by plugins to register any startup/cleanup code). # An ActorType defines the following hookspecs for plugins to hook into its behavior: # - abx.pm.hook.on_actor_startup(actor, queue) -# - abx.pm.hook.on_actor_tick_started(actor, object) -# - abx.pm.hook.on_actor_tick_finished(actor, object) +# - abx.pm.hook.on_actor_tick_start(actor, object) +# - abx.pm.hook.on_actor_tick_end(actor, object) # - abx.pm.hook.on_actor_tick_exception(actor, object, exception) # - abx.pm.hook.on_actor_shutdown(actor) @@ -107,8 +118,8 @@ from pathlib import Path # - external API calls (e.g. uploading to s3, firing a webhook, writing to a logfile, etc.) # - DO NOT use side effects to directly mutate other objects state or trigger other state transitions # ABX defines the following hookspecs for plugins to hook into transition behavior: -# - abx.pm.hook.on_transition__from_abx_to_xyz_started(object) -# - abx.pm.hook.on_transition__from_abx_to_xyz_succeeded(object) +# - abx.pm.hook.on_transition__from_abx_to_xyz_start(object) +# - abx.pm.hook.on_transition__from_abx_to_xyz_end(object) # READ: # A read() method is a function defined for a given ActorType that performs a single read from the DB and/or other read models like django cache, filesystem, in-memory caches, etc. diff --git a/archivebox/seeds/models.py b/archivebox/seeds/models.py index b0d83b2e..7fe49c83 100644 --- a/archivebox/seeds/models.py +++ b/archivebox/seeds/models.py @@ -1,19 +1,8 @@ __package__ = 'archivebox.seeds' -from datetime import datetime - -from django_stubs_ext.db.models import TypedModelMeta - from django.db import models -from django.db.models import Q -from django.core.validators import MaxValueValidator, MinValueValidator from django.conf import settings -from django.utils import timezone -from django.utils.functional import cached_property -from django.urls import reverse_lazy - -from pathlib import Path from abid_utils.models import ABIDModel, ABIDField, AutoDateTimeField, ModelWithHealthStats @@ -47,7 +36,10 @@ class Seed(ABIDModel, ModelWithHealthStats): abid_rand_src = 'self.id' abid_drift_allowed = True - uri = models.URLField(max_length=255, blank=False, null=False, unique=True) # unique source location where URLs will be loaded from + id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID') + abid = ABIDField(prefix=abid_prefix) + + uri = models.URLField(max_length=2000, blank=False, null=False) # unique source location where URLs will be loaded from extractor = models.CharField(default='auto', max_length=32) # suggested extractor to use to load this URL source tags_str = models.CharField(max_length=255, null=False, blank=True, default='') # tags to attach to any URLs that come from this source @@ -64,4 +56,10 @@ class Seed(ABIDModel, ModelWithHealthStats): # pocketapi:// # s3:// # etc.. - return self.uri.split('://')[0].lower() + return self.uri.split('://', 1)[0].lower() + + class Meta: + verbose_name = 'Seed' + verbose_name_plural = 'Seeds' + + unique_together = (('created_by', 'uri', 'extractor'),) diff --git a/pyproject.toml b/pyproject.toml index e8cec024..aceae950 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,7 @@ dependencies = [ "pluggy>=1.5.0", "requests>=2.32.3", "dateparser>=1.2.0", - "tzdata>=2024.2", # needed for dateparser {TZ: UTC} on some systems: https://github.com/ArchiveBox/ArchiveBox/issues/1553 + "tzdata>=2024.2", # needed for dateparser {TZ: UTC} on some systems: https://github.com/ArchiveBox/ArchiveBox/issues/1553 "feedparser>=6.0.11", "w3lib>=2.2.1", "rich>=13.8.0", @@ -86,40 +86,35 @@ dependencies = [ "yt-dlp>=2024.8.6", # for: media" ############# Plugin Dependencies ################ "abx>=0.1.0", - "abx-spec-pydantic-pkgr>=0.1.0", "abx-spec-config>=0.1.0", "abx-spec-archivebox>=0.1.0", "abx-spec-django>=0.1.0", "abx-spec-extractor>=0.1.0", "abx-spec-searchbackend>=0.1.0", - "abx-plugin-default-binproviders>=2024.10.24", "abx-plugin-pip>=2024.10.24", "abx-plugin-npm>=2024.10.24", "abx-plugin-playwright>=2024.10.24", "abx-plugin-puppeteer>=2024.10.28", - "abx-plugin-ripgrep-search>=2024.10.28", "abx-plugin-sqlitefts-search>=2024.10.28", "abx-plugin-sonic-search>=2024.10.28", "abx-plugin-ldap-auth>=2024.10.28", - "abx-plugin-curl>=2024.10.27", "abx-plugin-wget>=2024.10.28", "abx-plugin-git>=2024.10.28", "abx-plugin-chrome>=2024.10.28", "abx-plugin-ytdlp>=2024.10.28", - "abx-plugin-title>=2024.10.27", "abx-plugin-favicon>=2024.10.27", # "abx-plugin-headers>=2024.10.27", "abx-plugin-archivedotorg>=2024.10.28", - "abx-plugin-singlefile>=2024.10.28", "abx-plugin-readability>=2024.10.28", "abx-plugin-mercury>=2024.10.28", "abx-plugin-htmltotext>=2024.10.28", + "python-statemachine>=2.3.6", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index b29258ab..761668b7 100644 --- a/uv.lock +++ b/uv.lock @@ -661,6 +661,7 @@ dependencies = [ { name = "pydantic-settings" }, { name = "python-benedict", extra = ["io", "parse"] }, { name = "python-crontab" }, + { name = "python-statemachine" }, { name = "requests" }, { name = "rich" }, { name = "rich-argparse" }, @@ -789,6 +790,7 @@ requires-dist = [ { name = "python-benedict", extras = ["io", "parse"], specifier = ">=0.33.2" }, { name = "python-crontab", specifier = ">=3.2.0" }, { name = "python-ldap", marker = "extra == 'ldap'", specifier = ">=3.4.3" }, + { name = "python-statemachine", specifier = ">=2.3.6" }, { name = "requests", specifier = ">=2.32.3" }, { name = "requests-tracker", marker = "extra == 'debug'", specifier = ">=0.3.3" }, { name = "rich", specifier = ">=13.8.0" }, @@ -2729,6 +2731,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a4/62/02da182e544a51a5c3ccf4b03ab79df279f9c60c5e82d5e8bec7ca26ac11/python_slugify-8.0.4-py2.py3-none-any.whl", hash = "sha256:276540b79961052b66b7d116620b36518847f52d5fd9e3a70164fc8c50faa6b8", size = 10051 }, ] +[[package]] +name = "python-statemachine" +version = "2.3.6" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/20/c9/7034a362ce151f9fa0ead5630727a16122f7a5ed235d42447910dff95b6a/python_statemachine-2.3.6.tar.gz", hash = "sha256:9cb4040ca7f2158d3cd46f36a77b420b6ef95a90223928a7f3cab232a70bd560", size = 36735 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/39/04/19a44b549cbaae1ac6c2acc58afb96b71209da866713877f40aab2f45de6/python_statemachine-2.3.6-py3-none-any.whl", hash = "sha256:0001b02cbe2f5b2420c423b5b3e3a33915447ac6d9735219c929e2378d454f5f", size = 41529 }, +] + [[package]] name = "python-stdnum" version = "1.20"