diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py index 974b44ec..a8aa8be1 100644 --- a/archivebox/actors/actor.py +++ b/archivebox/actors/actor.py @@ -5,13 +5,12 @@ import time import traceback from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args from datetime import timedelta +import multiprocessing from multiprocessing import Process, cpu_count -from threading import Thread, get_native_id import psutil from rich import print -from statemachine import State, StateMachine, registry -from statemachine.mixins import MachineMixin +from statemachine import State, StateMachine from django import db from django.db.models import QuerySet, sql, Q @@ -19,9 +18,13 @@ from django.db.models import Model as DjangoModel from django.utils import timezone from django.utils.functional import classproperty +# from archivebox.logging_util import TimedProgress + from .models import ModelWithStateMachine -# from archivebox.logging_util import TimedProgress + +multiprocessing.set_start_method('fork', force=True) + class ActorObjectAlreadyClaimed(Exception): """Raised when the Actor tries to claim the next object from the queue but it's already been claimed by another Actor""" @@ -48,7 +51,6 @@ class ActorType(Generic[ModelType]): Base class for all actors. Usage: class FaviconActor(ActorType[FaviconArchiveResult]): - FINAL_STATES: ClassVar[tuple[str, ...]] = ('succeeded', 'failed', 'skipped') ACTIVE_STATE: ClassVar[str] = 'started' @classmethod @@ -60,9 +62,7 @@ class ActorType(Generic[ModelType]): Model: Type[ModelType] StateMachineClass: Type[StateMachine] - STATE_FIELD_NAME: ClassVar[str] = 'status' ACTIVE_STATE: ClassVar[ObjectState] = 'started' - FINAL_STATES: ClassVar[ObjectStateList] # e.g. ['succeeded', 'failed', 'skipped'] or ['sealed'] EVENT_NAME: ClassVar[str] = 'tick' # the event name to trigger on the obj.sm: StateMachine (usually 'tick') CLAIM_ORDER: ClassVar[tuple[str, ...]] = ('-retry_at',) # the .order(*args) to claim the queue objects in, use ('?',) for random order @@ -144,17 +144,7 @@ class ActorType(Generic[ModelType]): assert issubclass(Model, DjangoModel), f'{cls.__name__}.Model must be a valid django Model' return cast(Type[ModelType], Model) - @staticmethod - def _get_state_machine_cls(Model: Type[ModelType]) -> Type[StateMachine]: - """Get the StateMachine class for the given django Model that inherits from MachineMixin""" - assert issubclass(Model, MachineMixin), f'{Model.__name__} must inherit from MachineMixin and define a .state_machine_name: str' - model_state_machine_name = getattr(Model, 'state_machine_name', None) - if model_state_machine_name: - StateMachine = registry.get_machine_cls(model_state_machine_name) - assert issubclass(StateMachine, StateMachine) - return StateMachine - raise NotImplementedError(f'ActorType[{Model.__name__}] must define .state_machine_name: str that points to a valid StateMachine') - + @classmethod def _get_state_machine_instance(cls, obj: ModelType) -> StateMachine: """Get the StateMachine instance for the given django Model instance (and check that it is a valid instance of cls.StateMachineClass)""" @@ -180,7 +170,7 @@ class ActorType(Generic[ModelType]): raise ValueError(f'{cls.__name__}.Model must be set to the same Model as the Generic[ModelType] parameter in the class definition') # check that Model has a valid StateMachine with the required event defined on it - cls.StateMachineClass = getattr(cls, 'StateMachineClass', None) or cls._get_state_machine_cls(cls.Model) + cls.StateMachineClass = getattr(cls, 'StateMachineClass', None) # type: ignore assert isinstance(cls.EVENT_NAME, str), f'{cls.__name__}.EVENT_NAME must be a str, got: {type(cls.EVENT_NAME).__name__} instead' assert hasattr(cls.StateMachineClass, cls.EVENT_NAME), f'StateMachine {cls.StateMachineClass.__name__} must define a {cls.EVENT_NAME} event ({cls.__name__}.EVENT_NAME = {cls.EVENT_NAME})' @@ -189,26 +179,10 @@ class ActorType(Generic[ModelType]): if primary_key_field != 'id': raise NotImplementedError(f'Actors currently only support models that use .id as their primary key field ({cls.__name__} uses {cls.__name__}.{primary_key_field} as primary key)') - # check for STATE_FIELD_NAME classvar or set it from the model's state_field_name attr - if not getattr(cls, 'STATE_FIELD_NAME', None): - if hasattr(cls.Model, 'state_field_name'): - cls.STATE_FIELD_NAME = getattr(cls.Model, 'state_field_name') - else: - raise NotImplementedError(f'{cls.__name__} must define a STATE_FIELD_NAME: ClassVar[str] (e.g. "status") or have a .state_field_name attr on its Model') - assert isinstance(cls.STATE_FIELD_NAME, str), f'{cls.__name__}.STATE_FIELD_NAME must be a str, got: {type(cls.STATE_FIELD_NAME).__name__} instead' - - # check for FINAL_STATES classvar or set it from the model's final_states attr - if not getattr(cls, 'FINAL_STATES', None): - cls.FINAL_STATES = cls.StateMachineClass.final_states - if not cls.FINAL_STATES: - raise NotImplementedError(f'{cls.__name__} must define a non-empty FINAL_STATES: ClassVar[list[str]] (e.g. ["sealed"]) or have a {cls.Model.__name__}.state_machine_name pointing to a StateMachine that provides .final_states') - cls.FINAL_STATES = [cls._state_to_str(state) for state in cls.FINAL_STATES] - assert all(isinstance(state, str) for state in cls.FINAL_STATES), f'{cls.__name__}.FINAL_STATES must be a list[str], got: {type(cls.FINAL_STATES).__name__} instead' - - # check for ACTIVE_STATE classvar or set it from the model's active_state attr + # check that ACTIVE_STATE is defined and that it exists on the StateMachineClass if not getattr(cls, 'ACTIVE_STATE', None): - raise NotImplementedError(f'{cls.__name__} must define an ACTIVE_STATE: ClassVar[State] (e.g. SnapshotMachine.started) ({cls.Model.__name__}.{cls.STATE_FIELD_NAME} gets set to this value to mark objects as actively processing)') - assert isinstance(cls.ACTIVE_STATE, (State, str)), f'{cls.__name__}.ACTIVE_STATE must be a statemachine.State | str, got: {type(cls.ACTIVE_STATE).__name__} instead' + raise NotImplementedError(f'{cls.__name__} must define an ACTIVE_STATE: ClassVar[State] (e.g. SnapshotMachine.started) ({cls.Model.__name__}.{cls.Model.state_field_name} gets set to this value to mark objects as actively processing)') + assert isinstance(cls.ACTIVE_STATE, (State, str)) and hasattr(cls.StateMachineClass, cls._state_to_str(cls.ACTIVE_STATE)), f'{cls.__name__}.ACTIVE_STATE must be a statemachine.State | str that exists on {cls.StateMachineClass.__name__}, got: {type(cls.ACTIVE_STATE).__name__} instead' # check the other ClassVar attributes for valid values assert cls.CLAIM_ORDER and isinstance(cls.CLAIM_ORDER, tuple) and all(isinstance(order, str) for order in cls.CLAIM_ORDER), f'{cls.__name__}.CLAIM_ORDER must be a non-empty tuple[str, ...], got: {type(cls.CLAIM_ORDER).__name__} instead' @@ -217,14 +191,14 @@ class ActorType(Generic[ModelType]): assert cls.MAX_CONCURRENT_ACTORS >= 1, f'{cls.__name__}.MAX_CONCURRENT_ACTORS must be a positive int >=1, got: {cls.MAX_CONCURRENT_ACTORS} instead' assert isinstance(cls.CLAIM_ATOMIC, bool), f'{cls.__name__}.CLAIM_ATOMIC must be a bool, got: {cls.CLAIM_ATOMIC} instead' - @classmethod - def _fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int: - """Spawn a new background thread running the actor's runloop""" - actor = cls(mode='thread', **launch_kwargs) - bg_actor_thread = Thread(target=actor.runloop) - bg_actor_thread.start() - assert bg_actor_thread.native_id is not None - return bg_actor_thread.native_id + # @classmethod + # def _fork_actor_as_thread(cls, **launch_kwargs: LaunchKwargs) -> int: + # """Spawn a new background thread running the actor's runloop""" + # actor = cls(mode='thread', **launch_kwargs) + # bg_actor_thread = Thread(target=actor.runloop) + # bg_actor_thread.start() + # assert bg_actor_thread.native_id is not None + # return bg_actor_thread.native_id @classmethod def _fork_actor_as_process(cls, **launch_kwargs: LaunchKwargs) -> int: @@ -278,7 +252,8 @@ class ActorType(Generic[ModelType]): @classmethod def start(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int: if mode == 'thread': - return cls._fork_actor_as_thread(**launch_kwargs) + raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything') + # return cls._fork_actor_as_thread(**launch_kwargs) elif mode == 'process': return cls._fork_actor_as_process(**launch_kwargs) raise ValueError(f'Invalid actor mode: {mode} must be "thread" or "process"') @@ -295,12 +270,12 @@ class ActorType(Generic[ModelType]): @classproperty def final_q(cls) -> Q: """Get the filter for objects that are already completed / in a final state""" - return Q(**{f'{cls.STATE_FIELD_NAME}__in': [cls._state_to_str(s) for s in cls.FINAL_STATES]}) + return Q(**{f'{cls.Model.state_field_name}__in': [cls._state_to_str(s) for s in cls.StateMachineClass.final_states]}) @classproperty def active_q(cls) -> Q: """Get the filter for objects that are actively processing right now""" - return Q(**{cls.STATE_FIELD_NAME: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') + return Q(**{cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE)}) # e.g. Q(status='started') @classproperty def stalled_q(cls) -> Q: @@ -346,7 +321,7 @@ class ActorType(Generic[ModelType]): if obj_to_process: self.idle_count = 0 # reset idle count if we got an object else: - if self.idle_count >= 30: + if self.idle_count >= 3: break # stop looping and exit if queue is empty and we have idled for 30sec else: # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') @@ -387,7 +362,7 @@ class ActorType(Generic[ModelType]): Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars. """ return { - cls.STATE_FIELD_NAME: cls._state_to_str(cls.ACTIVE_STATE), + cls.Model.state_field_name: cls._state_to_str(cls.ACTIVE_STATE), 'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME), } @@ -474,18 +449,24 @@ class ActorType(Generic[ModelType]): # get the StateMachine instance from the object obj_statemachine = self._get_state_machine_instance(obj_to_process) + starting_state = obj_statemachine.current_state # trigger the event on the StateMachine instance obj_tick_method = getattr(obj_statemachine, self.EVENT_NAME) # e.g. obj_statemachine.tick() obj_tick_method() + ending_state = obj_statemachine.current_state + if starting_state != ending_state: + self.on_state_change(obj_to_process, starting_state, ending_state) + # save the object to persist any state changes obj_to_process.save() def on_startup(self) -> None: if self.mode == 'thread': - self.pid = get_native_id() # thread id - print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') + # self.pid = get_native_id() # thread id + # print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (THREAD)[/green]') + raise NotImplementedError('Thread-based actors are disabled to reduce codebase complexity. Please use processes for everything') else: self.pid = os.getpid() # process id print(f'[green]🏃‍♂️ {self}.on_startup() STARTUP (PROCESS)[/green]') @@ -505,13 +486,13 @@ class ActorType(Generic[ModelType]): # abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error) def on_tick_start(self, obj_to_process: ModelType) -> None: - print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') + # print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') # abx.pm.hook.on_actor_tick_start(actor=self, obj_to_process=obj) # self.timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') pass def on_tick_end(self, obj_to_process: ModelType) -> None: - print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') + # print(f'🏃‍♂️ {self}.on_tick_end() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') # abx.pm.hook.on_actor_tick_end(actor=self, obj_to_process=obj_to_process) # self.timer.end() pass @@ -523,6 +504,9 @@ class ActorType(Generic[ModelType]): print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}: [red]{type(error).__name__}: {error}[/red]') # abx.pm.hook.on_actor_tick_exception(actor=self, obj_to_process=obj_to_process, error=error) + def on_state_change(self, obj_to_process: ModelType, starting_state, ending_state) -> None: + print(f'🏃‍♂️ {self}.on_state_change() {obj_to_process.ABID} {starting_state} ➡️ {ending_state}') + # abx.pm.hook.on_actor_state_change(actor=self, obj_to_process=obj_to_process, starting_state=starting_state, ending_state=ending_state) def compile_sql_select(queryset: QuerySet, filter_kwargs: dict[str, Any] | None=None, order_args: tuple[str, ...]=(), limit: int | None=None) -> tuple[str, tuple[Any, ...]]: diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py index 7457372b..9585ad2b 100644 --- a/archivebox/actors/orchestrator.py +++ b/archivebox/actors/orchestrator.py @@ -10,8 +10,6 @@ from django.utils import timezone import multiprocessing -from threading import Thread, get_native_id - from rich import print @@ -32,11 +30,13 @@ class Orchestrator: actor_types: Dict[str, Type['ActorType']] = {} mode: Literal['thread', 'process'] = 'process' exit_on_idle: bool = True + max_concurrent_actors: int = 20 - def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True): + def __init__(self, actor_types: Dict[str, Type['ActorType']] | None = None, mode: Literal['thread', 'process'] | None=None, exit_on_idle: bool=True, max_concurrent_actors: int=max_concurrent_actors): self.actor_types = actor_types or self.actor_types or self.autodiscover_actor_types() self.mode = mode or self.mode self.exit_on_idle = exit_on_idle + self.max_concurrent_actors = max_concurrent_actors def __repr__(self) -> str: label = 'tid' if self.mode == 'thread' else 'pid' @@ -49,13 +49,13 @@ class Orchestrator: def name(cls) -> str: return cls.__name__ # type: ignore - def fork_as_thread(self): - self.thread = Thread(target=self.runloop) - self.thread.start() - assert self.thread.native_id is not None - return self.thread.native_id + # def _fork_as_thread(self): + # self.thread = Thread(target=self.runloop) + # self.thread.start() + # assert self.thread.native_id is not None + # return self.thread.native_id - def fork_as_process(self): + def _fork_as_process(self): self.process = multiprocessing.Process(target=self.runloop) self.process.start() assert self.process.pid is not None @@ -63,9 +63,10 @@ class Orchestrator: def start(self) -> int: if self.mode == 'thread': - return self.fork_as_thread() + # return self._fork_as_thread() + raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity') elif self.mode == 'process': - return self.fork_as_process() + return self._fork_as_process() raise ValueError(f'Invalid orchestrator mode: {self.mode}') @classmethod @@ -108,8 +109,9 @@ class Orchestrator: def on_startup(self): if self.mode == 'thread': - self.pid = get_native_id() - print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]') + # self.pid = get_native_id() + # print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (THREAD)[/green]') + raise NotImplementedError('Thread-based orchestrators are disabled for now to reduce codebase complexity') elif self.mode == 'process': self.pid = os.getpid() print(f'[green]👨‍✈️ {self}.on_startup() STARTUP (PROCESS)[/green]') @@ -120,16 +122,18 @@ class Orchestrator: # abx.pm.hook.on_orchestrator_shutdown(self) def on_tick_started(self, all_queues): - total_pending = sum(queue.count() for queue in all_queues.values()) - print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') + # total_pending = sum(queue.count() for queue in all_queues.values()) + # if total_pending: + # print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) pass def on_tick_finished(self, all_queues, all_existing_actors, all_spawned_actors): - if all_spawned_actors: - total_queue_length = sum(queue.count() for queue in all_queues.values()) - print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]') + # if all_spawned_actors: + # total_queue_length = sum(queue.count() for queue in all_queues.values()) + # print(f'[grey53]👨‍✈️ {self}.on_tick_finished() queue={total_queue_length} existing_actors={len(all_existing_actors)} spawned_actors={len(all_spawned_actors)}[/grey53]') # abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues) + pass def on_idle(self, all_queues): print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}') @@ -162,13 +166,18 @@ class Orchestrator: all_spawned_actors = [] for actor_type, queue in all_queues.items(): - next_obj = queue.first() - print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}') + if not queue.exists(): + continue + + # next_obj = queue.first() + # print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}') + self.idle_count = 0 try: existing_actors = actor_type.get_running_actors() all_existing_actors.extend(existing_actors) actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors) - for launch_kwargs in actors_to_spawn: + can_spawn_num_remaining = self.max_concurrent_actors - len(all_existing_actors) # set max_concurrent_actors=1 to disable multitasking + for launch_kwargs in actors_to_spawn[:can_spawn_num_remaining]: new_actor_pid = actor_type.start(mode='process', **launch_kwargs) all_spawned_actors.append(new_actor_pid) except Exception as err: @@ -192,98 +201,3 @@ class Orchestrator: else: print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red]', err.__class__.__name__, err) self.on_shutdown(err=err) - - - -# from archivebox.config.django import setup_django - -# setup_django() - -# from core.models import ArchiveResult, Snapshot - -# from django.utils import timezone - -# from django import db -# from django.db import connection - - -# from crawls.actors import CrawlActor -# from core.actors import SnapshotActor, ArchiveResultActor - -# class ArchivingOrchestrator(Orchestrator): -# actor_types = { -# 'CrawlActor': CrawlActor, -# 'SnapshotActor': SnapshotActor, -# 'ArchiveResultActor': ArchiveResultActor, -# # 'FaviconActor': FaviconActor, -# # 'SinglefileActor': SinglefileActor, -# } - -# from abx_plugin_singlefile.actors import SinglefileActor - - -# class FaviconActor(ActorType[ArchiveResult]): -# CLAIM_ORDER: ClassVar[str] = 'created_at DESC' -# CLAIM_WHERE: ClassVar[str] = 'status = "queued" AND extractor = "favicon"' -# CLAIM_SET: ClassVar[str] = 'status = "started"' - -# @classproperty -# def QUERYSET(cls) -> QuerySet: -# return ArchiveResult.objects.filter(status='failed', extractor='favicon') - -# def tick(self, obj: ArchiveResult): -# print(f'[grey53]{self}.tick({obj.abid or obj.id}, status={obj.status}) remaining:[/grey53]', self.get_queue().count()) -# updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') == 1 -# if not updated: -# raise Exception(f'Failed to update {obj.abid or obj.id}, interrupted by another actor writing to the same object') -# obj.refresh_from_db() -# obj.save() - - -# class ArchivingOrchestrator(Orchestrator): -# actor_types = { -# 'CrawlActor': CrawlActor, -# 'SnapshotActor': SnapshotActor, -# 'ArchiveResultActor': ArchiveResultActor, -# # 'FaviconActor': FaviconActor, -# # 'SinglefileActor': SinglefileActor, -# } - - -# if __name__ == '__main__': -# orchestrator = ExtractorsOrchestrator() -# orchestrator.start() - -# snap = Snapshot.objects.last() -# assert snap is not None -# created = 0 -# while True: -# time.sleep(0.05) -# # try: -# # ArchiveResult.objects.bulk_create([ -# # ArchiveResult( -# # id=uuid.uuid4(), -# # snapshot=snap, -# # status='failed', -# # extractor='favicon', -# # cmd=['echo', '"hello"'], -# # cmd_version='1.0', -# # pwd='.', -# # start_ts=timezone.now(), -# # end_ts=timezone.now(), -# # created_at=timezone.now(), -# # modified_at=timezone.now(), -# # created_by_id=1, -# # ) -# # for _ in range(100) -# # ]) -# # created += 100 -# # if created % 1000 == 0: -# # print(f'[blue]Created {created} ArchiveResults...[/blue]') -# # time.sleep(25) -# # except Exception as err: -# # print(err) -# # db.connections.close_all() -# # except BaseException as err: -# # print(err) -# # break