diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index 9585ad2b..9ba84b63 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -76,13 +76,13 @@ class Orchestrator: # returns a Dict of all discovered {actor_type_id: ActorType} across the codebase # override this method in a subclass to customize the actor types that are used - # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...} - from crawls.actors import CrawlActor - from core.actors import SnapshotActor, ArchiveResultActor + # return {'Snapshot': SnapshotWorker, 'ArchiveResult_chrome': ChromeActorType, ...} + from crawls.statemachines import CrawlWorker + from core.statemachines import SnapshotWorker, ArchiveResultWorker return { - 'CrawlActor': CrawlActor, - 'SnapshotActor': SnapshotActor, - 'ArchiveResultActor': ArchiveResultActor, + 'CrawlWorker': CrawlWorker, + 'SnapshotWorker': SnapshotWorker, + 'ArchiveResultWorker': ArchiveResultWorker, # look through all models and find all classes that inherit from ActorType # actor_type.__name__: actor_type # for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values() diff --git a/archivebox/core/actors.py b/archivebox/core/actors.py index 1e9db058..e69de29b 100644 --- a/archivebox/core/actors.py +++ b/archivebox/core/actors.py @@ -1,49 +0,0 @@ -__package__ = 'archivebox.core' - -from typing import ClassVar - -from statemachine import State - -from core.models import Snapshot, ArchiveResult -from core.statemachines import SnapshotMachine, ArchiveResultMachine -from actors.actor import ActorType - - -class SnapshotActor(ActorType[Snapshot]): - """ - The primary actor for progressing Snapshot objects - through their lifecycle using the SnapshotMachine. - """ - Model = Snapshot - StateMachineClass = SnapshotMachine - - ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started' - FINAL_STATES: ClassVar[list[State]] = SnapshotMachine.final_states # ['sealed'] - STATE_FIELD_NAME: ClassVar[str] = Snapshot.state_field_name # status - - MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 - MAX_TICK_TIME: ClassVar[int] = 10 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 - - - -class ArchiveResultActor(ActorType[ArchiveResult]): - """ - The primary actor for progressing ArchiveResult objects - through their lifecycle using the ArchiveResultMachine. - """ - Model = ArchiveResult - StateMachineClass = ArchiveResultMachine - - ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' - FINAL_STATES: ClassVar[list[State]] = ArchiveResultMachine.final_states # ['succeeded', 'failed', 'skipped'] - STATE_FIELD_NAME: ClassVar[str] = ArchiveResult.state_field_name # status - - MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 - MAX_TICK_TIME: ClassVar[int] = 60 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 - - # @classproperty - # def qs(cls) -> QuerySet[ModelType]: - # """Get the unfiltered and unsorted QuerySet of all objects that this Actor might care about.""" - # return cls.Model.objects.filter(extractor='favicon') diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py index 0f1fb31c..6a6a1c4e 100644 --- a/archivebox/core/statemachines.py +++ b/archivebox/core/statemachines.py @@ -1,15 +1,16 @@ -__package__ = 'archivebox.snapshots' +__package__ = 'archivebox.core' import time +from datetime import timedelta +from typing import ClassVar from django.utils import timezone from statemachine import State, StateMachine -from core.models import Snapshot, ArchiveResult +from actors.actor import ActorType -# State Machine Definitions -################################################# +from core.models import Snapshot, ArchiveResult class SnapshotMachine(StateMachine, strict_states=True): @@ -28,9 +29,9 @@ class SnapshotMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start') | + queued.to.itself(unless='can_start', internal=True) | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished') | + started.to.itself(unless='is_finished', internal=True) | started.to(sealed, cond='is_finished') ) @@ -48,6 +49,7 @@ class SnapshotMachine(StateMachine, strict_states=True): # if no archiveresults exist yet, it's not finished if not self.snapshot.archiveresult_set.exists(): return False + # if archiveresults exist but are still pending, it's not finished if self.snapshot.pending_archiveresults().exists(): return False @@ -68,10 +70,10 @@ class SnapshotMachine(StateMachine, strict_states=True): @started.enter def enter_started(self): print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)') - self.snapshot.status = Snapshot.StatusChoices.STARTED - self.snapshot.bump_retry_at(seconds=2) + self.snapshot.bump_retry_at(seconds=30) # if failed, wait 10s before retrying self.snapshot.save() self.snapshot.create_pending_archiveresults() + self.snapshot.status = Snapshot.StatusChoices.STARTED @sealed.enter def enter_sealed(self): @@ -81,6 +83,24 @@ class SnapshotMachine(StateMachine, strict_states=True): self.snapshot.save() +class SnapshotWorker(ActorType[Snapshot]): + """ + The primary actor for progressing Snapshot objects + through their lifecycle using the SnapshotMachine. + """ + Model = Snapshot + StateMachineClass = SnapshotMachine + + ACTIVE_STATE: ClassVar[State] = SnapshotMachine.started # 'started' + + MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 + MAX_TICK_TIME: ClassVar[int] = 10 + CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 + + + + + class ArchiveResultMachine(StateMachine, strict_states=True): """ State machine for managing ArchiveResult lifecycle. @@ -135,55 +155,88 @@ class ArchiveResultMachine(StateMachine, strict_states=True): def is_finished(self) -> bool: return self.is_failed() or self.is_succeeded() - + @queued.enter def enter_queued(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_queued(): archiveresult.retry_at = now()') - self.archiveresult.status = ArchiveResult.StatusChoices.QUEUED - self.archiveresult.retry_at = timezone.now() - self.archiveresult.save() + self.archiveresult.update_for_workers( + retry_at=timezone.now(), + status=ArchiveResult.StatusChoices.QUEUED, + start_ts=None, + ) # bump the snapshot's retry_at so they pickup any new changes @started.enter def enter_started(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') - self.archiveresult.status = ArchiveResult.StatusChoices.STARTED - self.archiveresult.start_ts = timezone.now() - self.archiveresult.bump_retry_at(seconds=2) - self.archiveresult.save() + # lock the object for the next 30sec + self.archiveresult.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=30), + status=ArchiveResult.StatusChoices.QUEUED, + start_ts=timezone.now(), + ) # lock the obj for the next ~30s to limit racing with other workers + + # create the output directory and fork the new extractor job subprocess self.archiveresult.create_output_dir() + # self.archiveresult.extract(background=True) + + # mark the object as started + self.archiveresult.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=30), # retry it again in 30s if it fails + status=ArchiveResult.StatusChoices.STARTED, + ) + + # simulate slow running extractor that completes after 2 seconds time.sleep(2) - self.archiveresult.output = 'completed' - self.archiveresult.save() + self.archiveresult.update_for_workers(output='completed') @backoff.enter def enter_backoff(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None') - self.archiveresult.status = ArchiveResult.StatusChoices.BACKOFF - self.archiveresult.retries = getattr(self.archiveresult, 'retries', 0) + 1 - self.archiveresult.bump_retry_at(seconds=2) - self.archiveresult.end_ts = None - self.archiveresult.save() + self.archiveresult.update_for_workers( + retry_at=timezone.now() + timedelta(seconds=60), + status=ArchiveResult.StatusChoices.BACKOFF, + end_ts=None, + # retries=F('retries') + 1, # F() equivalent to getattr(self.archiveresult, 'retries', 0) + 1, + ) + self.archiveresult.save(write_indexes=True) @succeeded.enter def enter_succeeded(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.retry_at = None, archiveresult.end_ts = now()') - self.archiveresult.status = ArchiveResult.StatusChoices.SUCCEEDED - self.archiveresult.retry_at = None - self.archiveresult.end_ts = timezone.now() - self.archiveresult.save() + self.archiveresult.update_for_workers( + retry_at=None, + status=ArchiveResult.StatusChoices.SUCCEEDED, + end_ts=timezone.now(), + # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine} + ) + self.archiveresult.save(write_indexes=True) @failed.enter def enter_failed(self): print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_failed(): archivebox.retry_at = None, archiveresult.end_ts = now()') - self.archiveresult.status = ArchiveResult.StatusChoices.FAILED - self.archiveresult.retry_at = None - self.archiveresult.end_ts = timezone.now() - self.archiveresult.save() + self.archiveresult.update_for_workers( + retry_at=None, + status=ArchiveResult.StatusChoices.FAILED, + end_ts=timezone.now(), + # **self.archiveresult.get_output_dict(), # {output, output_json, stderr, stdout, returncode, errors, cmd_version, pwd, cmd, machine} + ) - # def after_transition(self, event: str, source: State, target: State): - # print(f"after '{event}' from '{source.id}' to '{target.id}'") - # # self.archiveresult.save_merkle_index() - # # self.archiveresult.save_html_index() - # # self.archiveresult.save_json_index() - # return "after_transition" + def after_transition(self, event: str, source: State, target: State): + # print(f"after '{event}' from '{source.id}' to '{target.id}'") + self.archiveresult.snapshot.update_for_workers() # bump snapshot retry time so it picks up all the new changes + + +class ArchiveResultWorker(ActorType[ArchiveResult]): + """ + The primary actor for progressing ArchiveResult objects + through their lifecycle using the ArchiveResultMachine. + """ + Model = ArchiveResult + StateMachineClass = ArchiveResultMachine + + ACTIVE_STATE: ClassVar[State] = ArchiveResultMachine.started # 'started' + + MAX_CONCURRENT_ACTORS: ClassVar[int] = 6 + MAX_TICK_TIME: ClassVar[int] = 60 + CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 diff --git a/archivebox/crawls/actors.py b/archivebox/crawls/actors.py deleted file mode 100644 index 55c9f92c..00000000 --- a/archivebox/crawls/actors.py +++ /dev/null @@ -1,23 +0,0 @@ -__package__ = 'archivebox.crawls' - -from typing import ClassVar - -from crawls.models import Crawl -from crawls.statemachines import CrawlMachine - -from actors.actor import ActorType, State - - -class CrawlActor(ActorType[Crawl]): - """The Actor that manages the lifecycle of all Crawl objects""" - - Model = Crawl - StateMachineClass = CrawlMachine - - ACTIVE_STATE: ClassVar[State] = CrawlMachine.started - FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states - STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name - - MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 - MAX_TICK_TIME: ClassVar[int] = 10 - CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py index 9fe009fd..822b3111 100644 --- a/archivebox/crawls/statemachines.py +++ b/archivebox/crawls/statemachines.py @@ -1,14 +1,13 @@ __package__ = 'archivebox.crawls' +from typing import ClassVar from django.utils import timezone from statemachine import State, StateMachine +from actors.actor import ActorType from crawls.models import Crawl -# State Machine Definitions -################################################# - class CrawlMachine(StateMachine, strict_states=True): """State machine for managing Crawl lifecycle.""" @@ -22,9 +21,9 @@ class CrawlMachine(StateMachine, strict_states=True): # Tick Event tick = ( - queued.to.itself(unless='can_start') | + queued.to.itself(unless='can_start', internal=True) | queued.to(started, cond='can_start') | - started.to.itself(unless='is_finished') | + started.to.itself(unless='is_finished', internal=True) | started.to(sealed, cond='is_finished') ) @@ -63,3 +62,18 @@ class CrawlMachine(StateMachine, strict_states=True): self.crawl.retry_at = None self.crawl.save() + +class CrawlWorker(ActorType[Crawl]): + """The Actor that manages the lifecycle of all Crawl objects""" + + Model = Crawl + StateMachineClass = CrawlMachine + + ACTIVE_STATE: ClassVar[State] = CrawlMachine.started + FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states + STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name + + MAX_CONCURRENT_ACTORS: ClassVar[int] = 3 + MAX_TICK_TIME: ClassVar[int] = 10 + CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10 +