fix statemachine create_root_snapshot and retry timing

This commit is contained in:
Nick Sweeting 2024-11-17 22:57:09 -08:00
parent 67c22b2df0
commit 1ec2753664
No known key found for this signature in database
8 changed files with 48 additions and 40 deletions

View file

@ -21,7 +21,7 @@ class SnapshotActor(ActorType[Snapshot]):
FINAL_STATES: ClassVar[list[State]] = SnapshotMachine.final_states # ['sealed']
STATE_FIELD_NAME: ClassVar[str] = Snapshot.state_field_name # status
MAX_CONCURRENT_ACTORS: ClassVar[int] = 1 # 3
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
MAX_TICK_TIME: ClassVar[int] = 10
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10
@ -39,7 +39,7 @@ class ArchiveResultActor(ActorType[ArchiveResult]):
FINAL_STATES: ClassVar[list[State]] = ArchiveResultMachine.final_states # ['succeeded', 'failed', 'skipped']
STATE_FIELD_NAME: ClassVar[str] = ArchiveResult.state_field_name # status
MAX_CONCURRENT_ACTORS: ClassVar[int] = 1 # 6
MAX_CONCURRENT_ACTORS: ClassVar[int] = 6
MAX_TICK_TIME: ClassVar[int] = 60
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10

View file

@ -448,15 +448,16 @@ class Snapshot(ABIDModel, ModelWithStateMachine):
for extractor in EXTRACTORS:
if not extractor:
continue
archiveresult = ArchiveResult.objects.update_or_create(
archiveresult, created = ArchiveResult.objects.get_or_create(
snapshot=self,
extractor=extractor,
status=ArchiveResult.INITIAL_STATE,
defaults={
'status': ArchiveResult.INITIAL_STATE,
'retry_at': timezone.now(),
},
)
archiveresults.append(archiveresult)
if archiveresult.status == ArchiveResult.INITIAL_STATE:
archiveresults.append(archiveresult)
return archiveresults
@ -625,19 +626,12 @@ class ArchiveResult(ABIDModel, ModelWithStateMachine):
return '/api/v1/docs#/Core%20Models/api_v1_core_get_archiveresult'
def get_absolute_url(self):
return f'/{self.snapshot.archive_path}/{self.output_path()}'
return f'/{self.snapshot.archive_path}/{self.extractor}'
@property
def extractor_module(self) -> Any | None:
return abx.as_dict(abx.pm.hook.get_EXTRACTORS()).get(self.extractor, None)
def output_path(self) -> str | None:
"""return the canonical output filename or directory name within the snapshot dir"""
try:
return self.extractor_module.get_output_path(self.snapshot)
except Exception as e:
print(f'Error getting output path for {self.extractor} extractor: {e}')
return None
def embed_path(self) -> str | None:
"""
@ -656,18 +650,13 @@ class ArchiveResult(ABIDModel, ModelWithStateMachine):
return link.canonical_outputs().get(f'{self.extractor}_path')
def output_exists(self) -> bool:
output_path = self.output_path()
return bool(output_path and os.path.exists(output_path))
output_path = Path(self.snapshot_dir) / self.extractor
return os.path.exists(output_path)
def create_output_dir(self):
snap_dir = Path(self.snapshot_dir)
snap_dir.mkdir(parents=True, exist_ok=True)
output_path = self.output_path()
if output_path:
(snap_dir / output_path).mkdir(parents=True, exist_ok=True)
else:
raise ValueError(f'Not able to calculate output path for {self.extractor} extractor in {snap_dir}')
return snap_dir / output_path
output_dir = Path(self.snapshot_dir) / self.extractor
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir
def as_json(self, *args) -> dict:
args = args or self.keys

View file

@ -65,10 +65,14 @@ else:
# print(f'[!] WARNING: data/logs dir does not exist. Logging to temp file: {ERROR_LOG}')
pass
LOG_LEVEL_DATABASE = 'WARNING' # if DEBUG else 'WARNING'
LOG_LEVEL_DATABASE = 'WARNING'
LOG_LEVEL_REQUEST = 'WARNING' # if DEBUG else 'WARNING'
# UNCOMMENT TO LOG ALL SQL QUERIES:
# LOG_LEVEL_DATABASE = 'DEBUG'
# db_logger = logging.getLogger('django.db.backends')
# db_logger.setLevel(logging.DEBUG)
# db_logger.addHandler(logging.StreamHandler())
SETTINGS_LOGGING = {

View file

@ -1,5 +1,7 @@
__package__ = 'archivebox.snapshots'
import time
from django.utils import timezone
from statemachine import State, StateMachine
@ -67,7 +69,7 @@ class SnapshotMachine(StateMachine, strict_states=True):
def enter_started(self):
print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)')
self.snapshot.status = Snapshot.StatusChoices.STARTED
self.snapshot.bump_retry_at(seconds=60)
self.snapshot.bump_retry_at(seconds=2)
self.snapshot.save()
self.snapshot.create_pending_archiveresults()
@ -117,13 +119,19 @@ class ArchiveResultMachine(StateMachine, strict_states=True):
return self.archiveresult.snapshot and (self.archiveresult.retry_at < timezone.now())
def is_succeeded(self) -> bool:
return self.archiveresult.output_exists()
if self.archiveresult.output and 'err' not in self.archiveresult.output.lower():
return True
return False
def is_failed(self) -> bool:
return not self.archiveresult.output_exists()
if self.archiveresult.output and 'err' in self.archiveresult.output.lower():
return True
return False
def is_backoff(self) -> bool:
return self.archiveresult.STATE == ArchiveResult.StatusChoices.BACKOFF
if self.archiveresult.output is None:
return True
return False
def is_finished(self) -> bool:
return self.is_failed() or self.is_succeeded()
@ -141,19 +149,22 @@ class ArchiveResultMachine(StateMachine, strict_states=True):
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)')
self.archiveresult.status = ArchiveResult.StatusChoices.STARTED
self.archiveresult.start_ts = timezone.now()
self.archiveresult.bump_retry_at(seconds=60)
self.archiveresult.bump_retry_at(seconds=2)
self.archiveresult.save()
self.archiveresult.create_output_dir()
time.sleep(2)
self.archiveresult.output = 'completed'
self.archiveresult.save()
@backoff.enter
def enter_backoff(self):
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None')
self.archiveresult.status = ArchiveResult.StatusChoices.BACKOFF
self.archiveresult.retries = getattr(self.archiveresult, 'retries', 0) + 1
self.archiveresult.bump_retry_at(seconds=60)
self.archiveresult.bump_retry_at(seconds=2)
self.archiveresult.end_ts = None
self.archiveresult.save()
@succeeded.enter
def enter_succeeded(self):
print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.retry_at = None, archiveresult.end_ts = now()')

View file

@ -18,6 +18,6 @@ class CrawlActor(ActorType[Crawl]):
FINAL_STATES: ClassVar[list[State]] = CrawlMachine.final_states
STATE_FIELD_NAME: ClassVar[str] = Crawl.state_field_name
MAX_CONCURRENT_ACTORS: ClassVar[int] = 1
MAX_CONCURRENT_ACTORS: ClassVar[int] = 3
MAX_TICK_TIME: ClassVar[int] = 10
CLAIM_FROM_TOP_N: ClassVar[int] = MAX_CONCURRENT_ACTORS * 10

View file

@ -190,22 +190,28 @@ class Crawl(ABIDModel, ModelWithHealthStats, ModelWithStateMachine):
from core.models import ArchiveResult
snapshot_ids = self.snapshot_set.values_list('id', flat=True)
pending_archiveresults = ArchiveResult.objects.filter(snapshot_id__in=snapshot_ids, retry_at__isnull=True)
pending_archiveresults = ArchiveResult.objects.filter(snapshot_id__in=snapshot_ids, retry_at__isnull=False)
return pending_archiveresults
def create_root_snapshot(self) -> 'Snapshot':
from core.models import Snapshot
try:
return Snapshot.objects.get(crawl=self, url=self.seed.uri)
except Snapshot.DoesNotExist:
pass
root_snapshot, _ = Snapshot.objects.update_or_create(
crawl=self,
url=self.seed.uri,
defaults={
'crawl': self,
'status': Snapshot.INITIAL_STATE,
'retry_at': timezone.now(),
'timestamp': str(timezone.now().timestamp()),
# 'config': self.seed.config,
},
)
root_snapshot.save()
return root_snapshot

View file

@ -52,7 +52,7 @@ class CrawlMachine(StateMachine, strict_states=True):
def enter_started(self):
print(f'CrawlMachine[{self.crawl.ABID}].on_started(): crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)')
self.crawl.status = Crawl.StatusChoices.STARTED
self.crawl.bump_retry_at(seconds=10)
self.crawl.bump_retry_at(seconds=2)
self.crawl.save()
self.crawl.create_root_snapshot()

View file

@ -1,7 +1,5 @@
__package__ = 'abx_plugin_git'
from pathlib import Path
from abx_pkg import BinName
@ -14,7 +12,7 @@ class GitExtractor(BaseExtractor):
name: ExtractorName = 'git'
binary: BinName = GIT_BINARY.name
def get_output_path(self, snapshot) -> Path | None:
return snapshot.as_link() / 'git'
def get_output_path(self, snapshot) -> str:
return 'git'
GIT_EXTRACTOR = GitExtractor()