From 9c2eac4e47847aa9dbea08e2700824696d791cd0 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Thu, 31 Oct 2024 04:24:06 -0700 Subject: [PATCH] add new actors and orchestrators --- archivebox/actors/__init__.py | 0 archivebox/actors/actor.py | 144 ++++++++++++++++ archivebox/actors/admin.py | 3 + archivebox/actors/apps.py | 6 + archivebox/actors/migrations/__init__.py | 0 archivebox/actors/models.py | 3 + archivebox/actors/orchestrator.py | 207 +++++++++++++++++++++++ archivebox/actors/tests.py | 3 + archivebox/actors/views.py | 3 + 9 files changed, 369 insertions(+) create mode 100644 archivebox/actors/__init__.py create mode 100644 archivebox/actors/actor.py create mode 100644 archivebox/actors/admin.py create mode 100644 archivebox/actors/apps.py create mode 100644 archivebox/actors/migrations/__init__.py create mode 100644 archivebox/actors/models.py create mode 100644 archivebox/actors/orchestrator.py create mode 100644 archivebox/actors/tests.py create mode 100644 archivebox/actors/views.py diff --git a/archivebox/actors/__init__.py b/archivebox/actors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py new file mode 100644 index 00000000..6e9d523b --- /dev/null +++ b/archivebox/actors/actor.py @@ -0,0 +1,144 @@ +__package__ = 'archivebox.actors' + +import os +import time +import psutil +from typing import ClassVar, Generic, TypeVar, Any, cast, Literal + +from django.db.models import QuerySet +from multiprocessing import Process, cpu_count +from threading import Thread, get_native_id + +# from archivebox.logging_util import TimedProgress + +ALL_SPAWNED_ACTORS: list[psutil.Process] = [] + + +LaunchKwargs = dict[str, Any] + +ObjectType = TypeVar('ObjectType') + +class ActorType(Generic[ObjectType]): + pid: int + + MAX_CONCURRENT_ACTORS: ClassVar[int] = min(max(2, int(cpu_count() * 0.7)), 8) # min 2, max 8 + MAX_TICK_TIME: ClassVar[int] = 60 + + def __init__(self, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs): + self.mode = mode + self.launch_kwargs = launch_kwargs + + @classmethod + def get_running_actors(cls) -> list[int]: + # returns a list of pids of all running actors of this type + return [ + proc.pid for proc in ALL_SPAWNED_ACTORS + if proc.is_running() and proc.status() != 'zombie' + ] + + @classmethod + def spawn_actor(cls, mode: Literal['thread', 'process']='process', **launch_kwargs: LaunchKwargs) -> int: + actor = cls(mode=mode, **launch_kwargs) + # bg_actor_proccess = Process(target=actor.runloop) + if mode == 'thread': + bg_actor_thread = Thread(target=actor.runloop) + bg_actor_thread.start() + assert bg_actor_thread.native_id is not None + return bg_actor_thread.native_id + else: + bg_actor_process = Process(target=actor.runloop) + bg_actor_process.start() + assert bg_actor_process.pid is not None + ALL_SPAWNED_ACTORS.append(psutil.Process(pid=bg_actor_process.pid)) + return bg_actor_process.pid + + @classmethod + def get_queue(cls) -> QuerySet: + # return ArchiveResult.objects.filter(status='queued', extractor__in=('pdf', 'dom', 'screenshot')) + raise NotImplementedError + + @classmethod + def get_next(cls) -> ObjectType | None: + return cls.get_queue().last() + + @classmethod + def get_actors_to_spawn(cls, queue, running_actors) -> list[LaunchKwargs]: + actors_to_spawn: list[LaunchKwargs] = [] + max_spawnable = cls.MAX_CONCURRENT_ACTORS - len(running_actors) + queue_length = queue.count() + + if not queue_length: # queue is empty, spawn 0 actors + return actors_to_spawn + elif queue_length > 10: # queue is long, spawn as many as possible + actors_to_spawn += max_spawnable * [{}] + elif queue_length > 5: # queue is medium, spawn 1 or 2 actors + actors_to_spawn += min(2, max_spawnable) * [{}] + else: # queue is short, spawn 1 actor + actors_to_spawn += min(1, max_spawnable) * [{}] + return actors_to_spawn + + def on_startup(self): + if self.mode == 'thread': + self.pid = get_native_id() + else: + self.pid = os.getpid() + print('Actor on_startup()', f'pid={self.pid}') + # abx.pm.hook.on_actor_startup(self) + + def on_shutdown(self): + print('Actor on_shutdown()', f'pid={self.pid}') + # abx.pm.hook.on_actor_shutdown(self) + + def runloop(self): + self.on_startup() + + rechecks = 30 + + while True: + obj_to_process: ObjectType | None = None + try: + obj_to_process = cast(ObjectType, self.get_next()) + except Exception: + pass + + if obj_to_process: + rechecks = 30 + else: + if rechecks == 0: + break # stop looping and exit if queue is empty + else: + # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') + rechecks -= 1 + time.sleep(1) + continue + + if not self.lock(obj_to_process): + continue + + # abx.pm.hook.on_actor_tick_start(self, obj_to_process) + try: + # timer = TimedProgress(self.MAX_TICK_TIME, prefix=' ') + + # run the tick function on the object + self.tick(obj_to_process) + except Exception as err: + # abx.pm.hook.on_actor_tick_exception(self, obj_to_process, err) + print('ERROR: actor tick failed', err) + # refresh the db connection + from django import db + db.connections.close_all() + finally: + # timer.end() + pass + # abx.pm.hook.on_actor_tick_end(self, obj_to_process) + + self.on_shutdown() + + def tick(self, obj: ObjectType) -> None: + print('Actor Processing tick()', obj) + + def lock(self, obj: ObjectType) -> bool: + print('Actor lock()', obj) + return True + + diff --git a/archivebox/actors/admin.py b/archivebox/actors/admin.py new file mode 100644 index 00000000..8c38f3f3 --- /dev/null +++ b/archivebox/actors/admin.py @@ -0,0 +1,3 @@ +from django.contrib import admin + +# Register your models here. diff --git a/archivebox/actors/apps.py b/archivebox/actors/apps.py new file mode 100644 index 00000000..2347ac3f --- /dev/null +++ b/archivebox/actors/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class ActorsConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "actors" diff --git a/archivebox/actors/migrations/__init__.py b/archivebox/actors/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/archivebox/actors/models.py b/archivebox/actors/models.py new file mode 100644 index 00000000..71a83623 --- /dev/null +++ b/archivebox/actors/models.py @@ -0,0 +1,3 @@ +from django.db import models + +# Create your models here. diff --git a/archivebox/actors/orchestrator.py b/archivebox/actors/orchestrator.py new file mode 100644 index 00000000..5a404850 --- /dev/null +++ b/archivebox/actors/orchestrator.py @@ -0,0 +1,207 @@ +__package__ = 'archivebox.actors' + +import os +import time +from typing import Dict + +from multiprocessing import Process + +from django.db.models import QuerySet + +from .actor import ActorType + +class Orchestrator: + pid: int + + @classmethod + def spawn_orchestrator(cls) -> int: + orchestrator = cls() + orchestrator_bg_proc = Process(target=orchestrator.runloop) + orchestrator_bg_proc.start() + assert orchestrator_bg_proc.pid is not None + return orchestrator_bg_proc.pid + + @classmethod + def get_all_actor_types(cls) -> Dict[str, ActorType]: + # returns a Dict of all discovered {actor_type_id: ActorType} ... + # return {'Snapshot': SnapshotActorType, 'ArchiveResult_chrome': ChromeActorType, ...} + return { + 'TestActor': TestActor(), + } + + @classmethod + def get_orphaned_objects(cls, all_queues) -> list: + # returns a list of objects that are in the queues of all actor types but not in the queues of any other actor types + return [] + + def on_startup(self): + self.pid = os.getpid() + print('Orchestrator startup', self.pid) + # abx.pm.hook.on_orchestrator_startup(self) + + def on_shutdown(self, err: BaseException | None = None): + print('Orchestrator shutdown', self.pid, err) + # abx.pm.hook.on_orchestrator_shutdown(self) + + def on_tick_started(self, actor_types, all_queues): + total_pending = sum(queue.count() for queue in all_queues.values()) + print('Orchestrator tick +', self.pid, f'total_pending={total_pending}') + # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) + + def on_tick_finished(self, actor_types, all_queues): + # print('Orchestrator tick √', self.pid) + # abx.pm.hook.on_orchestrator_tick_finished(self, actor_types, all_queues) + pass + + def on_idle(self): + # print('Orchestrator idle', self.pid) + # abx.pm.hook.on_orchestrator_idle(self) + pass + + def runloop(self): + self.pid = os.getpid() + + try: + while True: + actor_types = self.get_all_actor_types() + all_queues = { + actor_type: actor_type.get_queue() + for actor_type in actor_types.values() + } + self.on_tick_started(actor_types, all_queues) + + all_existing_actors = [] + all_spawned_actors = [] + + for actor_type, queue in all_queues.items(): + existing_actors = actor_type.get_running_actors() + all_existing_actors.extend(existing_actors) + actors_to_spawn = actor_type.get_actors_to_spawn(queue, existing_actors) + for launch_kwargs in actors_to_spawn: + all_spawned_actors.append(actor_type.spawn_actor(**launch_kwargs)) + + if all_spawned_actors: + print(f'Found {len(all_existing_actors)} existing actors, Spawned {len(all_spawned_actors)} new actors') + else: + # print(f'No actors to spawn, currently_running: {len(all_existing_actors)}') + time.sleep(1) + + orphaned_objects = self.get_orphaned_objects(all_queues) + if orphaned_objects: + print('WARNING: some objects may will not be processed', orphaned_objects) + + if not any(queue.exists() for queue in all_queues.values()): + # we are idle + self.on_idle() + # time.sleep(0.250) + time.sleep(2) + + self.on_tick_finished(actor_types, all_queues) + + except (KeyboardInterrupt, SystemExit) as err: + self.on_shutdown(err) + + + +from archivebox.config.django import setup_django + +setup_django() + +from core.models import ArchiveResult, Snapshot + +from django.utils import timezone + +from django import db +from django.db import connection + +def get_next_archiveresult_atomically() -> ArchiveResult | None: + with connection.cursor() as cursor: + # select a random archiveresult out of the next 50 pending ones + # (to avoid clashing with another actor thats also selecting from the same list) + cursor.execute(""" + UPDATE core_archiveresult + SET status = 'started' + WHERE status = 'failed' and id = ( + SELECT id FROM ( + SELECT id FROM core_archiveresult + WHERE status = 'failed' + ORDER BY start_ts DESC + LIMIT 50 + ) candidates + ORDER BY RANDOM() + LIMIT 1 + ) + RETURNING *; + """) + result = cursor.fetchone() + + # If no rows were updated, return None + if result is None: + return None + + # Convert the row tuple into a dict matching column names + columns = [col[0] for col in cursor.description] + return ArchiveResult(**dict(zip(columns, result))) + + +class TestActor(ActorType[ArchiveResult]): + @classmethod + def get_queue(cls) -> QuerySet[ArchiveResult]: + return ArchiveResult.objects.filter(status='failed', extractor='favicon') + + @classmethod + def get_next(cls) -> ArchiveResult | None: + return get_next_archiveresult_atomically() + # return cls.get_queue().last() + + def tick(self, obj: ArchiveResult): + # print(f'TestActor[{self.pid}] tick({obj.id})', 'remaining:', self.get_queue().count()) + updated = ArchiveResult.objects.filter(id=obj.id, status='started').update(status='success') + if not updated: + raise Exception('Failed to update object status, likely being processed by another actor') + + def lock(self, obj: ArchiveResult) -> bool: + locked = True + # locked = ArchiveResult.objects.select_for_update(skip_locked=True).filter(id=obj.id, status='pending').update(status='started') == 1 + # if locked: + # print(f'TestActor[{self.pid}] lock({obj.id}) 🔒') + # else: + # print(f'TestActor[{self.pid}] lock({obj.id}) X') + return locked + +if __name__ == '__main__': + snap = Snapshot.objects.last() + assert snap is not None + + orchestrator = Orchestrator() + orchestrator.spawn_orchestrator() + + for _ in range(50_000): + try: + ar = ArchiveResult.objects.create( + snapshot=snap, + status='failed', + extractor='favicon', + cmd=['echo', '"hello"'], + cmd_version='1.0', + pwd='.', + start_ts=timezone.now(), + end_ts=timezone.now(), + ) + except Exception as err: + print(err) + db.connections.close_all() + if _ % 1000 == 0: + print('Created', _, 'snapshots...') + time.sleep(0.001) + # time.sleep(3) + + # test_queue = TestActor.get_queue() + # thread_actors = [] + # print('Actor queue:', test_queue) + # actors_to_spawn = TestActor.get_actors_to_spawn(test_queue, thread_actors) + # print('Actors to spawn:', actors_to_spawn) + # # thread_actors = [TestActor.spawn_actor(mode='thread') for _ in actors_to_spawn] + # # print('Thread Actors spawned:', thread_actors) + # process_actors = [TestActor.spawn_actor(mode='process') for _ in actors_to_spawn] + # print('Process Actors spawned:', process_actors) diff --git a/archivebox/actors/tests.py b/archivebox/actors/tests.py new file mode 100644 index 00000000..7ce503c2 --- /dev/null +++ b/archivebox/actors/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/archivebox/actors/views.py b/archivebox/actors/views.py new file mode 100644 index 00000000..91ea44a2 --- /dev/null +++ b/archivebox/actors/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render + +# Create your views here.