ArchiveBox/archivebox/crawls/statemachines.py

112 lines
3.8 KiB
Python

__package__ = 'archivebox.crawls'
import os
from typing import ClassVar
from datetime import timedelta
from django.utils import timezone
from rich import print
from statemachine import State, StateMachine
from workers.actor import ActorType
from crawls.models import Crawl
class CrawlMachine(StateMachine, strict_states=True):
"""State machine for managing Crawl lifecycle."""
model: Crawl
# States
queued = State(value=Crawl.StatusChoices.QUEUED, initial=True)
started = State(value=Crawl.StatusChoices.STARTED)
sealed = State(value=Crawl.StatusChoices.SEALED, final=True)
# Tick Event
tick = (
queued.to.itself(unless='can_start') |
queued.to(started, cond='can_start') |
started.to.itself(unless='is_finished') |
started.to(sealed, cond='is_finished')
)
def __init__(self, crawl, *args, **kwargs):
self.crawl = crawl
super().__init__(crawl, *args, **kwargs)
def __repr__(self) -> str:
return f'[grey53]Crawl\\[{self.crawl.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.crawl.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]'
def __str__(self) -> str:
return self.__repr__()
def can_start(self) -> bool:
return bool(self.crawl.seed and self.crawl.seed.uri)
def is_finished(self) -> bool:
from core.models import Snapshot, ArchiveResult
# check that at least one snapshot exists for this crawl
snapshots = Snapshot.objects.filter(crawl=self.crawl)
if not snapshots.exists():
return False
# check to make sure no snapshots are in non-final states
if snapshots.filter(status__in=[Snapshot.StatusChoices.QUEUED, Snapshot.StatusChoices.STARTED]).exists():
return False
# check that some archiveresults exist for this crawl
results = ArchiveResult.objects.filter(snapshot__crawl=self.crawl)
if not results.exists():
return False
# check if all archiveresults are finished
if results.filter(status__in=[Crawl.StatusChoices.QUEUED, Crawl.StatusChoices.STARTED]).exists():
return False
return True
# def before_transition(self, event, state):
# print(f"Before '{event}', on the '{state.id}' state.")
# return "before_transition_return"
@started.enter
def enter_started(self):
print(f'{self}.on_started(): [blue]↳ STARTED[/blue] crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)')
# lock the crawl object for 2s while we create the root snapshot
self.crawl.update_for_workers(
retry_at=timezone.now() + timedelta(seconds=5),
status=Crawl.StatusChoices.QUEUED,
)
assert self.crawl.create_root_snapshot()
# only update status to STARTED once root snapshot is created
self.crawl.update_for_workers(
retry_at=timezone.now() + timedelta(seconds=5),
status=Crawl.StatusChoices.STARTED,
)
@sealed.enter
def enter_sealed(self):
print(f'{self}.on_sealed(): [blue]↳ SEALED[/blue] crawl.retry_at=None')
self.crawl.update_for_workers(
retry_at=None,
status=Crawl.StatusChoices.SEALED,
)
class CrawlWorker(ActorType[Crawl]):
"""The Actor that manages the lifecycle of all Crawl objects"""
Model = Crawl
StateMachineClass = CrawlMachine
ACTIVE_STATE: ClassVar[State] = CrawlMachine.started
FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states
STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
MAX_TICK_TIME: ClassVar[int] = 10
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10