diff --git a/archivebox/actors/models.py b/archivebox/actors/models.py index 31777c1c..36aa191a 100644 --- a/archivebox/actors/models.py +++ b/archivebox/actors/models.py @@ -18,7 +18,7 @@ class DefaultStatusChoices(models.TextChoices): default_status_field: models.CharField = models.CharField(choices=DefaultStatusChoices.choices, max_length=15, default=DefaultStatusChoices.QUEUED, null=False, blank=False, db_index=True) -default_retry_at_field: models.DateTimeField = models.DateTimeField(default=timezone.now, null=False, db_index=True) +default_retry_at_field: models.DateTimeField = models.DateTimeField(default=timezone.now, null=True, blank=True, db_index=True) ObjectState = State | str ObjectStateList = Iterable[ObjectState] @@ -164,7 +164,7 @@ class BaseModelWithStateMachine(models.Model, MachineMixin): @classproperty def ACTIVE_STATE(cls) -> str: - return cls._state_to_str(cls.StateMachineClass.active_state) + return cls._state_to_str(cls.active_state) @classproperty def INITIAL_STATE(cls) -> str: diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index b83a8c65..0e39c947 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -3,29 +3,40 @@ __package__ = 'archivebox.actors' import os import time import itertools -from typing import Dict, Type, Literal, ClassVar +from typing import Dict, Type, Literal, TYPE_CHECKING from django.utils.functional import classproperty +from django.utils import timezone + +import multiprocessing + -from multiprocessing import Process from threading import Thread, get_native_id from rich import print -from django.db.models import QuerySet +# from django.db.models import QuerySet from django.apps import apps -from .actor import ActorType + +if TYPE_CHECKING: + from .actor import ActorType + + +multiprocessing.set_start_method('fork', force=True) + class Orchestrator: pid: int idle_count: int = 0 - actor_types: Dict[str, Type[ActorType]] + actor_types: Dict[str, Type['ActorType']] = {} mode: Literal['thread', 'process'] = 'process' - - def __init__(self, actor_types: Dict[str, Type[ActorType]] | None = None, mode: Literal['thread', 'process'] | None=None): + exit_on_idle: bool = True + + def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True): self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types() self.mode = mode or self.mode + self.exit_on_idle = exit_on_idle def __repr__(self) -> str: label = 'tid' if self.mode == 'thread' else 'pid' @@ -45,7 +56,7 @@ class Orchestrator: return self.thread.native_id def fork_as_process(self): - self.process = Process(target=self.runloop) + self.process = multiprocessing.Process(target=self.runloop) self.process.start() assert self.process.pid is not None return self.process.pid @@ -58,11 +69,19 @@ class Orchestrator: raise ValueError(f'Invalid orchestrator mode: {self.mode}') @classmethod - def autodiscover_actor_types(cls) -> Dict[str, Type[ActorType]]: + def autodiscover_actor_types(cls) -> Dict[str, Type['ActorType']]: + from archivebox.config.django import setup_django + setup_django() + # returns a Dict of all discovered {actor_type_id: ActorType} across the codebase # override this method in a subclass to customize the actor types that are used # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...} + from crawls.actors import CrawlActor + from core.actors import SnapshotActor, ArchiveResultActor return { + 'CrawlActor': CrawlActor, + 'SnapshotActor': SnapshotActor, + 'ArchiveResultActor': ArchiveResultActor, # look through all models and find all classes that inherit from ActorType # actor_type.__name__: actor_type # for actor_type in abx.pm.hook.get_all_ACTORS_TYPES().values() @@ -111,8 +130,13 @@ class Orchestrator: orphaned_objects = self.get_orphaned_objects(all_queues) if orphaned_objects: print('[red]👨‍✈️ WARNING: some objects may not be processed, no actor has claimed them after 60s:[/red]', orphaned_objects) + if self.idle_count > 5 and self.exit_on_idle: + raise KeyboardInterrupt('Orchestrator has no more tasks to process, exiting') def runloop(self): + from archivebox.config.django import setup_django + setup_django() + self.on_startup() try: while True: @@ -160,85 +184,95 @@ class Orchestrator: -from archivebox.config.django import setup_django +# from archivebox.config.django import setup_django -setup_django() +# setup_django() -from core.models import ArchiveResult, Snapshot +# from core.models import ArchiveResult, Snapshot -from django.utils import timezone +# from django.utils import timezone -from django import db -from django.db import connection +# from django import db +# from django.db import connection -from crawls.actors import CrawlActor -from .actor_snapshot import SnapshotActor +# from crawls.actors import CrawlActor +# from core.actors import SnapshotActor, ArchiveResultActor -from abx_plugin_singlefile.actors import SinglefileActor +# class ArchivingOrchestrator(Orchestrator): +# actor_types = { +# 'CrawlActor': CrawlActor, +# 'SnapshotActor': SnapshotActor, +# 'ArchiveResultActor': ArchiveResultActor, +# # 'FaviconActor': FaviconActor, +# # 'SinglefileActor': SinglefileActor, +# } + +# from abx_plugin_singlefile.actors import SinglefileActor -class FaviconActor(ActorType[ArchiveResult]): - CLAIM_ORDER: ClassVar[str] = 'created_at DESC' - CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"' - CLAIM_SET: ClassVar[str] = 'status = "started"' +# class FaviconActor(ActorType[ArchiveResult]): +# CLAIM_ORDER: ClassVar[str] = 'created_at DESC' +# CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"' +# CLAIM_SET: ClassVar[str] = 'status = "started"' - @classproperty - def QUERYSET(cls) -> QuerySet: - return ArchiveResult.objects.filter(status='failed', extractor='favicon') +# @classproperty +# def QUERYSET(cls) -> QuerySet: +# return ArchiveResult.objects.filter(status='failed', extractor='favicon') - def tick(self, obj: ArchiveResult): - print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count()) - updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 - if not updated: - raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') - obj.refresh_from_db() - obj.save() +# def tick(self, obj: ArchiveResult): +# print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count()) +# updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 +# if not updated: +# raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') +# obj.refresh_from_db() +# obj.save() -class ExtractorsOrchestrator(Orchestrator): - actor_types = { - 'CrawlActor': CrawlActor, - 'SnapshotActor': SnapshotActor, - 'FaviconActor': FaviconActor, - 'SinglefileActor': SinglefileActor, - } +# class ArchivingOrchestrator(Orchestrator): +# actor_types = { +# 'CrawlActor': CrawlActor, +# 'SnapshotActor': SnapshotActor, +# 'ArchiveResultActor': ArchiveResultActor, +# # 'FaviconActor': FaviconActor, +# # 'SinglefileActor': SinglefileActor, +# } -if __name__ == '__main__': - orchestrator = ExtractorsOrchestrator() - orchestrator.start() +# if __name__ == '__main__': +# orchestrator = ExtractorsOrchestrator() +# orchestrator.start() - snap = Snapshot.objects.last() - assert snap is not None - created = 0 - while True: - time.sleep(0.05) - # try: - # ArchiveResult.objects.bulk_create([ - # ArchiveResult( - # id=uuid.uuid4(), - # snapshot=snap, - # status='failed', - # extractor='favicon', - # cmd=['echo', '"hello"'], - # cmd_version='1.0', - # pwd='.', - # start_ts=timezone.now(), - # end_ts=timezone.now(), - # created_at=timezone.now(), - # modified_at=timezone.now(), - # created_by_id=1, - # ) - # for _ in range(100) - # ]) - # created += 100 - # if created % 1000 == 0: - # print(f'[blue]Created {created} ArchiveResults...[/blue]') - # time.sleep(25) - # except Exception as err: - # print(err) - # db.connections.close_all() - # except BaseException as err: - # print(err) - # break +# snap = Snapshot.objects.last() +# assert snap is not None +# created = 0 +# while True: +# time.sleep(0.05) +# # try: +# # ArchiveResult.objects.bulk_create([ +# # ArchiveResult( +# # id=uuid.uuid4(), +# # snapshot=snap, +# # status='failed', +# # extractor='favicon', +# # cmd=['echo', '"hello"'], +# # cmd_version='1.0', +# # pwd='.', +# # start_ts=timezone.now(), +# # end_ts=timezone.now(), +# # created_at=timezone.now(), +# # modified_at=timezone.now(), +# # created_by_id=1, +# # ) +# # for _ in range(100) +# # ]) +# # created += 100 +# # if created % 1000 == 0: +# # print(f'[blue]Created {created} ArchiveResults...[/blue]') +# # time.sleep(25) +# # except Exception as err: +# # print(err) +# # db.connections.close_all() +# # except BaseException as err: +# # print(err) +# # break