From eb53145e4e38e50b85b083dbc27f9f91b60e597d Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Mon, 18 Nov 2024 04:25:45 -0800 Subject: [PATCH] working state machine flow yay --- archivebox/abid_utils/models.py | 23 +++++++++++--- archivebox/actors/actor.py | 32 ++++++++++--------- archivebox/actors/orchestrator.py | 12 +++---- archivebox/api/v1_cli.py | 2 +- archivebox/core/statemachines.py | 39 +++++++++++++++-------- archivebox/crawls/models.py | 1 + archivebox/crawls/statemachines.py | 50 ++++++++++++++++++++++-------- 7 files changed, 107 insertions(+), 52 deletions(-) diff --git a/archivebox/abid_utils/models.py b/archivebox/abid_utils/models.py index 4e60807f..3ae2a8dc 100644 --- a/archivebox/abid_utils/models.py +++ b/archivebox/abid_utils/models.py @@ -351,7 +351,7 @@ class ABIDModel(models.Model): def update_for_workers(self, **update_kwargs) -> bool: """Immediately update the **kwargs on the object in DB, and reset the retry_at to now()""" - updated = bool(self._meta.model.objects.filter(pk=self.pk).update(retry_at=timezone.now(), **update_kwargs)) + updated = bool(self._meta.model.objects.filter(pk=self.pk).update(**{'retry_at': timezone.now(), **update_kwargs})) self.refresh_from_db() return updated @@ -387,16 +387,30 @@ class ModelWithHealthStats(models.Model): class ModelWithOutputDir(ABIDModel): + """ + Base Model that adds an output_dir property to any ABIDModel. + + It creates the directory on .save(with_indexes=True), automatically migrating any old data if needed. + It then writes the indexes to the output_dir on .save(write_indexes=True). + It also makes sure the output_dir is in sync with the model. + """ class Meta: abstract = True # output_dir = models.FilePathField(path=CONSTANTS.DATA_DIR, max_length=200, blank=True, null=True) - # output_files = models.JSONField(default=dict) + # output_files = models.TextField(default='') + # format: ,,,, + # ...,...,123456,text/plain,index.merkle + # ...,...,123456,text/html,index.html + # ...,...,123456,application/json,index.json + # ...,...,123456,text/html,singlefile/index.html def save(self, *args, write_indexes=False, **kwargs) -> None: super().save(*args, **kwargs) + self.OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + self.save_json_index() # always write index.json to data/snapshots/snp_2342353k2jn3j32l4324/index.json if write_indexes: - self.write_indexes() + self.write_indexes() # write the index.html, merkle hashes, symlinks, send indexable texts to search backend, etc. @property def output_dir_type(self) -> str: @@ -429,7 +443,6 @@ class ModelWithOutputDir(ABIDModel): self.migrate_output_dir() self.save_merkle_index() self.save_html_index() - self.save_json_index() self.save_symlinks_index() def migrate_output_dir(self): @@ -533,7 +546,7 @@ def find_model_from_abid_prefix(prefix: str) -> type[ABIDModel] | None: Return the Django Model that corresponds to a given ABID prefix. e.g. 'tag_' -> core.models.Tag """ - prefix = abid_part_from_prefix(prefix) + prefix = abid_part_from_prefix(prefix) # snp_... -> snp_ import django.apps diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py index a8aa8be1..ec0b2984 100644 --- a/archivebox/actors/actor.py +++ b/archivebox/actors/actor.py @@ -270,27 +270,31 @@ class ActorType(Generic[ModelType]): @classproperty def final_q(cls) -> Q: """Get the filter for objects that are already completed / in a final state""" - return Q(**{f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states]}) + return Q(**{ + f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states], + }) # status__in=('sealed', 'failed', 'succeeded') @classproperty def active_q(cls) -> Q: - """Get the filter for objects that are actively processing right now""" - return Q(**{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') + """Get the filter for objects that are marked active (and are still running / not timed out)""" + return Q(retry_at__gte=timezone.now(), **{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') @classproperty def stalled_q(cls) -> Q: - """Get the filter for objects that are marked active but have timed out""" - return cls.active_q & Q(retry_at__lte=timezone.now()) # e.g. Q(status='started') AND Q() + """Get the filter for objects that are marked active but are timed out""" + return Q(retry_at__lte=timezone.now(), **{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') AND Q() @classproperty def future_q(cls) -> Q: """Get the filter for objects that have a retry_at in the future""" - return Q(retry_at__gt=timezone.now()) + return Q(retry_at__gt=timezone.now(), **{cls.Model.state_field_name: 'QUEUED'}) @classproperty def pending_q(cls) -> Q: """Get the filter for objects that are ready for processing.""" - return (~(cls.active_q) & ~(cls.final_q)) | Q(retry_at__lte=timezone.now()) + return ~Q(**{ + f'{cls.Model.state_field_name}__in': (*[cls._state_to_str(s) for s in cls.StateMachineClass.final_states], cls._state_to_str(cls.ACTIVE_STATE)) + }) # status__not_in=('sealed', 'failed', 'succeeded', 'started') @classmethod def get_queue(cls, sort: bool=True) -> QuerySet[ModelType]: @@ -298,7 +302,7 @@ class ActorType(Generic[ModelType]): Get the sorted and filtered QuerySet of objects that are ready for processing. e.g. qs.exclude(status__in=('sealed', 'started'), retry_at__gt=timezone.now()).order_by('retry_at') """ - unsorted_qs = cls.qs.filter(cls.pending_q) + unsorted_qs = cls.qs.filter(cls.pending_q) | cls.qs.filter(cls.stalled_q) return unsorted_qs.order_by(*cls.CLAIM_ORDER) if sort else unsorted_qs ### Instance Methods: Only called from within Actor instance after it has been spawned (i.e. forked as a thread or process) @@ -324,7 +328,7 @@ class ActorType(Generic[ModelType]): if self.idle_count >= 3: break # stop looping and exit if queue is empty and we have idled for 30sec else: - # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') + print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') self.idle_count += 1 time.sleep(1) continue @@ -335,7 +339,7 @@ class ActorType(Generic[ModelType]): self.tick(obj_to_process) except Exception as err: last_error = err - # print(f'[red]🏃‍♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]') + print(f'[red]🏃‍♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]') db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions self.on_tick_exception(obj_to_process, err) traceback.print_exc() @@ -362,7 +366,7 @@ class ActorType(Generic[ModelType]): Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars. """ return { - cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE), + # cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE), # do this manually in the state machine enter hooks 'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME), } @@ -465,7 +469,7 @@ class ActorType(Generic[ModelType]): def on_startup(self) -> None: if self.mode == 'thread': # self.pid = get_native_id() # thread id - # print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') + print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything') else: self.pid = os.getpid() # process id @@ -486,13 +490,13 @@ class ActorType(Generic[ModelType]): # abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error) def on_tick_start(self, obj_to_process: ModelType) -> None: - # print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') + print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') # abx.pm.hook.on_actor_tick_start(actor=self, obj_to_process=obj) # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') pass def on_tick_end(self, obj_to_process: ModelType) -> None: - # print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') + print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') # abx.pm.hook.on_actor_tick_end(actor=self, obj_to_process=obj_to_process) # self.timer.end() pass diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index 9ba84b63..01e3475f 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -110,7 +110,7 @@ class Orchestrator: def on_startup(self): if self.mode == 'thread': # self.pid = get_native_id() - # print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]') + print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]') raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity') elif self.mode == 'process': self.pid = os.getpid() @@ -122,9 +122,9 @@ class Orchestrator: # abx.pm.hook.on_orchestrator_shutdown(self) def on_tick_started(self, all_queues): - # total_pending = sum(queue.count() for queue in all_queues.values()) - # if total_pending: - # print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') + total_pending = sum(queue.count() for queue in all_queues.values()) + if total_pending: + print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) pass @@ -169,8 +169,8 @@ class Orchestrator: if not queue.exists(): continue - # next_obj = queue.first() - # print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}') + next_obj = queue.first() + print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}') self.idle_count = 0 try: existing_actors = actor_type.get_running_actors() diff --git a/archivebox/api/v1_cli.py b/archivebox/api/v1_cli.py index a5380512..6ec8f3df 100644 --- a/archivebox/api/v1_cli.py +++ b/archivebox/api/v1_cli.py @@ -6,7 +6,7 @@ from enum import Enum from ninja import Router, Schema -from ..main import ( +from archivebox.main import ( add, remove, update, diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py index 6a6a1c4e..3e72265f 100644 --- a/archivebox/core/statemachines.py +++ b/archivebox/core/statemachines.py @@ -29,9 +29,9 @@ class SnapshotMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start', internal=True) | + queued.to.itself(unless='can_start') | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished', internal=True) | + started.to.itself(unless='is_finished') | started.to(sealed, cond='is_finished') ) @@ -40,7 +40,7 @@ class SnapshotMachine(StateMachine, strict_states=True): super().__init__(snapshot, *args, **kwargs) def can_start(self) -> bool: - can_start = bool(self.snapshot.url and (self.snapshot.retry_at < timezone.now())) + can_start = bool(self.snapshot.url) if not can_start: print(f'SnapshotMachine[{self.snapshot.ABID}].can_start() False: {self.snapshot.url} {self.snapshot.retry_at} {timezone.now()}') return can_start @@ -63,24 +63,34 @@ class SnapshotMachine(StateMachine, strict_states=True): @queued.enter def enter_queued(self): print(f'SnapshotMachine[{self.snapshot.ABID}].on_queued(): snapshot.retry_at = now()') - self.snapshot.status = Snapshot.StatusChoices.QUEUED - self.snapshot.retry_at = timezone.now() - self.snapshot.save() + self.snapshot.update_for_workers( + retry_at=timezone.now(), + status=Snapshot.StatusChoices.QUEUED, + ) @started.enter def enter_started(self): print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)') - self.snapshot.bump_retry_at(seconds=30) # if failed, wait 10s before retrying - self.snapshot.save() + # lock the snapshot while we create the pending archiveresults + self.snapshot.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying + ) + # create the pending archiveresults self.snapshot.create_pending_archiveresults() - self.snapshot.status = Snapshot.StatusChoices.STARTED + + # unlock the snapshot after we're done creating the pending archiveresults + set status = started + self.snapshot.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=5), # wait 5s before checking it again + status=Snapshot.StatusChoices.STARTED, + ) @sealed.enter def enter_sealed(self): print(f'SnapshotMachine[{self.snapshot.ABID}].on_sealed(): snapshot.retry_at=None') - self.snapshot.status = Snapshot.StatusChoices.SEALED - self.snapshot.retry_at = None - self.snapshot.save() + self.snapshot.update_for_workers( + retry_at=None, + status=Snapshot.StatusChoices.SEALED, + ) class SnapshotWorker(ActorType[Snapshot]): @@ -136,7 +146,10 @@ class ArchiveResultMachine(StateMachine, strict_states=True): super().__init__(archiveresult, *args, **kwargs) def can_start(self) -> bool: - return self.archiveresult.snapshot and (self.archiveresult.retry_at < timezone.now()) + can_start = bool(self.archiveresult.snapshot.url) + if not can_start: + print(f'ArchiveResultMachine[{self.archiveresult.ABID}].can_start() False: {self.archiveresult.snapshot.url} {self.archiveresult.retry_at} {timezone.now()}') + return can_start def is_succeeded(self) -> bool: if self.archiveresult.output and 'err' not in self.archiveresult.output.lower(): diff --git a/archivebox/crawls/models.py b/archivebox/crawls/models.py index 890e02a5..ebda8e0d 100644 --- a/archivebox/crawls/models.py +++ b/archivebox/crawls/models.py @@ -194,6 +194,7 @@ class Crawl(ABIDModel, ModelWithHealthStats, ModelWithStateMachine): return pending_archiveresults def create_root_snapshot(self) -> 'Snapshot': + print(f'Crawl[{self.ABID}].create_root_snapshot()') from core.models import Snapshot try: diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py index 822b3111..8f76c98b 100644 --- a/archivebox/crawls/statemachines.py +++ b/archivebox/crawls/statemachines.py @@ -1,6 +1,7 @@ __package__ = 'archivebox.crawls' from typing import ClassVar +from datetime import timedelta from django.utils import timezone from statemachine import State, StateMachine @@ -21,9 +22,9 @@ class CrawlMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start', internal=True) | + queued.to.itself(unless='can_start') | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished', internal=True) | + started.to.itself(unless='is_finished') | started.to(sealed, cond='is_finished') ) @@ -32,15 +33,29 @@ class CrawlMachine(StateMachine, strict_states=True): super().__init__(crawl, *args, **kwargs) def can_start(self) -> bool: - return bool(self.crawl.seed and self.crawl.seed.uri and (self.retry_at < timezone.now())) + return bool(self.crawl.seed and self.crawl.seed.uri) def is_finished(self) -> bool: - if not self.crawl.snapshot_set.exists(): + from core.models import Snapshot, ArchiveResult + + # check that at least one snapshot exists for this crawl + snapshots = Snapshot.objects.filter(crawl=self.crawl) + if not snapshots.exists(): return False - if self.crawl.pending_snapshots().exists(): + + # check to make sure no snapshots are in non-final states + if snapshots.filter(status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED]).exists(): return False - if self.crawl.pending_archiveresults().exists(): + + # check that some archiveresults exist for this crawl + results = ArchiveResult.objects.filter(snapshot__crawl=self.crawl) + if not results.exists(): return False + + # check if all archiveresults are finished + if results.filter(status__in=[Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]).exists(): + return False + return True # def before_transition(self, event, state): @@ -50,17 +65,26 @@ class CrawlMachine(StateMachine, strict_states=True): @started.enter def enter_started(self): print(f'CrawlMachine[{self.crawl.ABID}].on_started(): crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)') - self.crawl.status = Crawl.StatusChoices.STARTED - self.crawl.bump_retry_at(seconds=2) - self.crawl.save() - self.crawl.create_root_snapshot() + # lock the crawl object for 2s while we create the root snapshot + self.crawl.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=5), + status=Crawl.StatusChoices.QUEUED, + ) + assert self.crawl.create_root_snapshot() + + # only update status to STARTED once root snapshot is created + self.crawl.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=5), + status=Crawl.StatusChoices.STARTED, + ) @sealed.enter def enter_sealed(self): print(f'CrawlMachine[{self.crawl.ABID}].on_sealed(): crawl.retry_at=None') - self.crawl.status = Crawl.StatusChoices.SEALED - self.crawl.retry_at = None - self.crawl.save() + self.crawl.update_for_workers( + retry_at=None, + status=Crawl.StatusChoices.SEALED, + ) class CrawlWorker(ActorType[Crawl]):