diff --git a/archivebox/cli/archivebox_update.py b/archivebox/cli/archivebox_update.py index 9694b6e6..97185ff7 100644 --- a/archivebox/cli/archivebox_update.py +++ b/archivebox/cli/archivebox_update.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 __package__ = 'archivebox.cli' -__command__ = 'archivebox update' -import sys -import argparse -from typing import List, Optional, IO -from archivebox.misc.util import docstring +import rich_click as click + +from typing import Iterable + +from archivebox.misc.util import enforce_types, docstring from archivebox.index import ( LINK_FILTERS, get_indexed_folders, @@ -21,8 +21,66 @@ from archivebox.index import ( get_corrupted_folders, get_unrecognized_folders, ) -from archivebox.misc.logging_util import SmartFormatter, accept_stdin -# from ..main import update + + +@enforce_types +def update(filter_patterns: Iterable[str]=(), + only_new: bool=False, + index_only: bool=False, + resume: float | None=None, + overwrite: bool=False, + before: float | None=None, + after: float | None=None, + status: str='indexed', + filter_type: str='exact', + extract: str="") -> None: + """Import any new links from subscriptions and retry any previously failed/skipped links""" + + from archivebox.config.django import setup_django + setup_django() + + from workers.orchestrator import Orchestrator + orchestrator = Orchestrator(exit_on_idle=False) + orchestrator.start() + + +@click.command() +@click.option('--only-new', is_flag=True, help="Don't attempt to retry previously skipped/failed links when updating") +@click.option('--index-only', is_flag=True, help="Update the main index without archiving any content") +@click.option('--resume', type=float, help='Resume the update process from a given timestamp') +@click.option('--overwrite', '-F', is_flag=True, help='Ignore existing archived content and overwrite with new versions (DANGEROUS)') +@click.option('--before', type=float, help="Update only links bookmarked before the given timestamp") +@click.option('--after', type=float, help="Update only links bookmarked after the given timestamp") +@click.option('--status', type=click.Choice([ + 'indexed', 'archived', 'unarchived', + 'present', 'valid', 'invalid', + 'duplicate', 'orphaned', 'corrupted', 'unrecognized' +]), default='indexed', help=f''' +Update only links or data directories that have the given status: + indexed {get_indexed_folders.__doc__} (the default) + archived {get_archived_folders.__doc__} + unarchived {get_unarchived_folders.__doc__} + + present {get_present_folders.__doc__} + valid {get_valid_folders.__doc__} + invalid {get_invalid_folders.__doc__} + + duplicate {get_duplicate_folders.__doc__} + orphaned {get_orphaned_folders.__doc__} + corrupted {get_corrupted_folders.__doc__} + unrecognized {get_unrecognized_folders.__doc__} +''') +@click.option('--filter-type', '-t', type=click.Choice([*LINK_FILTERS.keys(), 'search']), default='exact', help='Type of pattern matching to use when filtering URLs') +@click.option('--extract', '-e', default='', help='Comma-separated list of extractors to use e.g. title,favicon,screenshot,singlefile,...') +@click.argument('filter_patterns', nargs=-1) +@docstring(update.__doc__) +def main(**kwargs): + """Import any new links from subscriptions and retry any previously failed/skipped links""" + update(**kwargs) + + +if __name__ == '__main__': + main() @@ -103,127 +161,3 @@ from archivebox.misc.logging_util import SmartFormatter, accept_stdin # # Step 4: Re-write links index with updated titles, icons, and resources # all_links = load_main_index(out_dir=out_dir) # return all_links - - - - - -def update(): - """Import any new links from subscriptions and retry any previously failed/skipped links""" - from archivebox.config.django import setup_django - setup_django() - - from workers.orchestrator import Orchestrator - orchestrator = Orchestrator(exit_on_idle=False) - orchestrator.start() - - -@docstring(update.__doc__) -def main(args: Optional[List[str]]=None, stdin: Optional[IO]=None, pwd: Optional[str]=None) -> None: - parser = argparse.ArgumentParser( - prog=__command__, - description=update.__doc__, - add_help=True, - formatter_class=SmartFormatter, - ) - parser.add_argument( - '--only-new', #'-n', - action='store_true', - help="Don't attempt to retry previously skipped/failed links when updating", - ) - parser.add_argument( - '--index-only', #'-o', - action='store_true', - help="Update the main index without archiving any content", - ) - parser.add_argument( - '--resume', #'-r', - type=float, - help='Resume the update process from a given timestamp', - default=None, - ) - parser.add_argument( - '--overwrite', #'-x', - action='store_true', - help='Ignore existing archived content and overwrite with new versions (DANGEROUS)', - ) - parser.add_argument( - '--before', #'-b', - type=float, - help="Update only links bookmarked before the given timestamp.", - default=None, - ) - parser.add_argument( - '--after', #'-a', - type=float, - help="Update only links bookmarked after the given timestamp.", - default=None, - ) - parser.add_argument( - '--status', - type=str, - choices=('indexed', 'archived', 'unarchived', 'present', 'valid', 'invalid', 'duplicate', 'orphaned', 'corrupted', 'unrecognized'), - default='indexed', - help=( - 'Update only links or data directories that have the given status\n' - f' indexed {get_indexed_folders.__doc__} (the default)\n' - f' archived {get_archived_folders.__doc__}\n' - f' unarchived {get_unarchived_folders.__doc__}\n' - '\n' - f' present {get_present_folders.__doc__}\n' - f' valid {get_valid_folders.__doc__}\n' - f' invalid {get_invalid_folders.__doc__}\n' - '\n' - f' duplicate {get_duplicate_folders.__doc__}\n' - f' orphaned {get_orphaned_folders.__doc__}\n' - f' corrupted {get_corrupted_folders.__doc__}\n' - f' unrecognized {get_unrecognized_folders.__doc__}\n' - ) - ) - parser.add_argument( - '--filter-type', '-t', - type=str, - choices=(*LINK_FILTERS.keys(), 'search'), - default='exact', - help='Type of pattern matching to use when filtering URLs', - ) - parser.add_argument( - 'filter_patterns', - nargs='*', - type=str, - default=None, - help='Update only URLs matching these filter patterns.' - ) - parser.add_argument( - "--extract", - type=str, - help="Pass a list of the extractors to be used. If the method name is not correct, it will be ignored. \ - This does not take precedence over the configuration", - default="" - ) - command = parser.parse_args(args or ()) - - filter_patterns_str = None - if not command.filter_patterns: - filter_patterns_str = accept_stdin(stdin) - - update() - - # update( - # resume=command.resume, - # only_new=command.only_new, - # index_only=command.index_only, - # overwrite=command.overwrite, - # filter_patterns_str=filter_patterns_str, - # filter_patterns=command.filter_patterns, - # filter_type=command.filter_type, - # status=command.status, - # after=command.after, - # before=command.before, - # out_dir=Path(pwd) if pwd else DATA_DIR, - # extractors=command.extract, - # ) - - -if __name__ == '__main__': - main(args=sys.argv[1:], stdin=sys.stdin) diff --git a/archivebox/core/statemachines.py b/archivebox/core/statemachines.py index 6fbca260..b850fdf8 100644 --- a/archivebox/core/statemachines.py +++ b/archivebox/core/statemachines.py @@ -1,11 +1,14 @@ __package__ = 'archivebox.core' import time +import os from datetime import timedelta from typing import ClassVar from django.utils import timezone +from rich import print + from statemachine import State, StateMachine from workers.actor import ActorType @@ -39,10 +42,16 @@ class SnapshotMachine(StateMachine, strict_states=True): self.snapshot = snapshot super().__init__(snapshot, *args, **kwargs) + def __repr__(self) -> str: + return f'[grey53]Snapshot\\[{self.snapshot.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.snapshot.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' + + def __str__(self) -> str: + return self.__repr__() + def can_start(self) -> bool: can_start = bool(self.snapshot.url) if not can_start: - print(f'SnapshotMachine[{self.snapshot.ABID}].can_start() False: {self.snapshot.url} {self.snapshot.retry_at} {timezone.now()}') + print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue] cant start yet +{timezone.now() - self.snapshot.retry_at}s') return can_start def is_finished(self) -> bool: @@ -57,12 +66,12 @@ class SnapshotMachine(StateMachine, strict_states=True): # otherwise archiveresults exist and are all finished, so it's finished return True - def on_transition(self, event, state): - print(f'SnapshotMachine[{self.snapshot.ABID}].on_transition() {event} -> {state}') + # def on_transition(self, event, state): + # print(f'{self}.on_transition() [blue]{str(state).upper()}[/blue] ➡️ ...') @queued.enter def enter_queued(self): - print(f'SnapshotMachine[{self.snapshot.ABID}].on_queued(): snapshot.retry_at = now()') + print(f'{self}.on_queued() ↳ snapshot.retry_at = now()') self.snapshot.update_for_workers( retry_at=timezone.now(), status=Snapshot.StatusChoices.QUEUED, @@ -70,7 +79,7 @@ class SnapshotMachine(StateMachine, strict_states=True): @started.enter def enter_started(self): - print(f'SnapshotMachine[{self.snapshot.ABID}].on_started(): snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)') + print(f'{self}.on_started() ↳ snapshot.create_pending_archiveresults() + snapshot.bump_retry_at(+60s)') # lock the snapshot while we create the pending archiveresults self.snapshot.update_for_workers( retry_at=timezone.now() + timedelta(seconds=30), # if failed, wait 30s before retrying @@ -86,7 +95,7 @@ class SnapshotMachine(StateMachine, strict_states=True): @sealed.enter def enter_sealed(self): - print(f'SnapshotMachine[{self.snapshot.ABID}].on_sealed(): snapshot.retry_at=None') + print(f'{self}.on_sealed() ↳ snapshot.retry_at=None') self.snapshot.update_for_workers( retry_at=None, status=Snapshot.StatusChoices.SEALED, @@ -144,11 +153,17 @@ class ArchiveResultMachine(StateMachine, strict_states=True): def __init__(self, archiveresult, *args, **kwargs): self.archiveresult = archiveresult super().__init__(archiveresult, *args, **kwargs) + + def __repr__(self) -> str: + return f'[grey53]ArchiveResult\\[{self.archiveresult.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.archiveresult.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' + + def __str__(self) -> str: + return self.__repr__() def can_start(self) -> bool: can_start = bool(self.archiveresult.snapshot.url) if not can_start: - print(f'ArchiveResultMachine[{self.archiveresult.ABID}].can_start() False: {self.archiveresult.snapshot.url} {self.archiveresult.retry_at} {timezone.now()}') + print(f'{self}.can_start() [blue]QUEUED[/blue] ➡️❌ [blue]STARTED[/blue]: cant start yet +{timezone.now() - self.archiveresult.retry_at}s') return can_start def is_succeeded(self) -> bool: @@ -172,7 +187,7 @@ class ArchiveResultMachine(StateMachine, strict_states=True): @queued.enter def enter_queued(self): - print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_queued(): archiveresult.retry_at = now()') + print(f'{self}.on_queued() ↳ archiveresult.retry_at = now()') self.archiveresult.update_for_workers( retry_at=timezone.now(), status=ArchiveResult.StatusChoices.QUEUED, @@ -181,7 +196,7 @@ class ArchiveResultMachine(StateMachine, strict_states=True): @started.enter def enter_started(self): - print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_started(): archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') + print(f'{self}.on_started() ↳ archiveresult.start_ts + create_output_dir() + bump_retry_at(+60s)') # lock the object for the next 30sec self.archiveresult.update_for_workers( retry_at=timezone.now() + timedelta(seconds=30), @@ -205,7 +220,7 @@ class ArchiveResultMachine(StateMachine, strict_states=True): @backoff.enter def enter_backoff(self): - print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_backoff(): archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None') + print(f'{self}.on_backoff() ↳ archiveresult.retries += 1, archiveresult.bump_retry_at(+60s), archiveresult.end_ts = None') self.archiveresult.update_for_workers( retry_at=timezone.now() + timedelta(seconds=60), status=ArchiveResult.StatusChoices.BACKOFF, @@ -216,7 +231,7 @@ class ArchiveResultMachine(StateMachine, strict_states=True): @succeeded.enter def enter_succeeded(self): - print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_succeeded(): archiveresult.retry_at = None, archiveresult.end_ts = now()') + print(f'{self}.on_succeeded() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()') self.archiveresult.update_for_workers( retry_at=None, status=ArchiveResult.StatusChoices.SUCCEEDED, @@ -227,7 +242,7 @@ class ArchiveResultMachine(StateMachine, strict_states=True): @failed.enter def enter_failed(self): - print(f'ArchiveResultMachine[{self.archiveresult.ABID}].on_failed(): archivebox.retry_at = None, archiveresult.end_ts = now()') + print(f'{self}.on_failed() ↳ archiveresult.retry_at = None, archiveresult.end_ts = now()') self.archiveresult.update_for_workers( retry_at=None, status=ArchiveResult.StatusChoices.FAILED, diff --git a/archivebox/crawls/statemachines.py b/archivebox/crawls/statemachines.py index a62c4b40..4af5054c 100644 --- a/archivebox/crawls/statemachines.py +++ b/archivebox/crawls/statemachines.py @@ -1,9 +1,12 @@ __package__ = 'archivebox.crawls' +import os from typing import ClassVar from datetime import timedelta from django.utils import timezone +from rich import print + from statemachine import State, StateMachine from workers.actor import ActorType @@ -31,6 +34,12 @@ class CrawlMachine(StateMachine, strict_states=True): def __init__(self, crawl, *args, **kwargs): self.crawl = crawl super().__init__(crawl, *args, **kwargs) + + def __repr__(self) -> str: + return f'[grey53]Crawl\\[{self.crawl.ABID}] 🏃‍♂️ Worker\\[pid={os.getpid()}].tick()[/grey53] [blue]{self.crawl.status.upper()}[/blue] ⚙️ [grey37]Machine[/grey37]' + + def __str__(self) -> str: + return self.__repr__() def can_start(self) -> bool: return bool(self.crawl.seed and self.crawl.seed.uri) @@ -64,7 +73,7 @@ class CrawlMachine(StateMachine, strict_states=True): @started.enter def enter_started(self): - print(f'CrawlMachine[{self.crawl.ABID}].on_started(): crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)') + print(f'{self}.on_started(): [blue]↳ STARTED[/blue] crawl.create_root_snapshot() + crawl.bump_retry_at(+10s)') # lock the crawl object for 2s while we create the root snapshot self.crawl.update_for_workers( retry_at=timezone.now() + timedelta(seconds=5), @@ -80,7 +89,7 @@ class CrawlMachine(StateMachine, strict_states=True): @sealed.enter def enter_sealed(self): - print(f'CrawlMachine[{self.crawl.ABID}].on_sealed(): crawl.retry_at=None') + print(f'{self}.on_sealed(): [blue]↳ SEALED[/blue] crawl.retry_at=None') self.crawl.update_for_workers( retry_at=None, status=Crawl.StatusChoices.SEALED, diff --git a/archivebox/misc/logging_util.py b/archivebox/misc/logging_util.py index 5e688961..36f5ffbb 100644 --- a/archivebox/misc/logging_util.py +++ b/archivebox/misc/logging_util.py @@ -468,7 +468,7 @@ def log_list_started(filter_patterns: Optional[List[str]], filter_type: str): print(' {}'.format(' '.join(filter_patterns or ()))) def log_list_finished(links): - from ..index.csv import links_to_csv + from archivebox.index.csv import links_to_csv print() print('---------------------------------------------------------------------------------------------------') print(links_to_csv(links, cols=['timestamp', 'is_archived', 'num_outputs', 'url'], header=True, ljust=16, separator=' | ')) @@ -492,7 +492,7 @@ def log_removal_started(links: List["Link"], yes: bool, delete: bool): if not yes: print() - print('[yellow3][?] Do you want to proceed with removing these {len(links)} links?[/]') + print(f'[yellow3][?] Do you want to proceed with removing these {len(links)} links?[/]') try: assert input(' y/[n]: ').lower() == 'y' except (KeyboardInterrupt, EOFError, AssertionError): diff --git a/archivebox/workers/actor.py b/archivebox/workers/actor.py index 920203a3..16f53931 100644 --- a/archivebox/workers/actor.py +++ b/archivebox/workers/actor.py @@ -110,7 +110,8 @@ class ActorType(Generic[ModelType]): def __repr__(self) -> str: """-> FaviconActor[pid=1234]""" label = 'pid' if self.mode == 'process' else 'tid' - return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]' + # return f'[underline]{self.name}[/underline]\\[{label}={self.pid}]' + return f'[underline]Worker[/underline]\\[{label}={self.pid}]' @staticmethod def _state_to_str(state: ObjectState) -> str: @@ -210,6 +211,10 @@ class ActorType(Generic[ModelType]): cls._SPAWNED_ACTOR_PIDS.append(psutil.Process(pid=bg_actor_process.pid)) return bg_actor_process.pid + @classmethod + def _obj_repr(cls, obj: ModelType | Any) -> str: + """Get a string representation of the given django Model instance""" + return f'[grey53]{type(obj).__name__}\\[{obj.ABID}][/grey53]' ### Class Methods: Called by Orchestrator on ActorType class before it has been spawned @@ -328,7 +333,7 @@ class ActorType(Generic[ModelType]): if self.idle_count >= 3: break # stop looping and exit if queue is empty and we have idled for 30sec else: - print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') + # print('Actor runloop()', f'pid={self.pid}', 'queue empty, rechecking...') self.idle_count += 1 time.sleep(1) continue @@ -339,7 +344,7 @@ class ActorType(Generic[ModelType]): self.tick(obj_to_process) except Exception as err: last_error = err - print(f'[red]🏃‍♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]') + print(f'[red]{self._obj_repr(obj_to_process)} 🏃‍♂️ {self}.tick()[/red] ERROR: [red]{type(err).__name__}: {err}[/red]') db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions self.on_tick_exception(obj_to_process, err) traceback.print_exc() @@ -351,7 +356,7 @@ class ActorType(Generic[ModelType]): if isinstance(err, KeyboardInterrupt): print() else: - print(f'\n[red]🏃‍♂️ {self}.runloop() FATAL:[/red] {type(err).__name__}: {err}') + print(f'\n[red]{self._obj_repr(obj_to_process)} 🏃‍♂️ {self}.runloop() FATAL:[/red] {type(err).__name__}: {err}') print(f' Last processed object: {obj_to_process}') raise finally: @@ -449,7 +454,7 @@ class ActorType(Generic[ModelType]): def tick(self, obj_to_process: ModelType) -> None: """Call the object.sm.tick() method to process the object""" - print(f'[blue]🏃‍♂️ {self}.tick()[/blue] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') + print(f'\n[grey53]{self._obj_repr(obj_to_process)} 🏃‍♂️ {self}.tick()[/grey53] [blue]{obj_to_process.status.upper()}[/blue] ➡️ ... +{(obj_to_process.retry_at - timezone.now()).total_seconds() if obj_to_process.retry_at else "-"}s') # get the StateMachine instance from the object obj_statemachine = self._get_state_machine_instance(obj_to_process) @@ -477,17 +482,18 @@ class ActorType(Generic[ModelType]): # abx.pm.hook.on_actor_startup(actor=self) def on_shutdown(self, last_obj: ModelType | None=None, last_error: BaseException | None=None) -> None: - if isinstance(last_error, KeyboardInterrupt) or last_error is None: - last_error_str = '[green](CTRL-C)[/green]' - elif isinstance(last_error, ActorQueueIsEmpty): - last_error_str = '[green](queue empty)[/green]' - elif isinstance(last_error, ActorObjectAlreadyClaimed): - last_error_str = '[green](queue race)[/green]' - else: - last_error_str = f'[red]{type(last_error).__name__}: {last_error}[/red]' + # if isinstance(last_error, KeyboardInterrupt) or last_error is None: + # last_error_str = '[green](CTRL-C)[/green]' + # elif isinstance(last_error, ActorQueueIsEmpty): + # last_error_str = '[green](queue empty)[/green]' + # elif isinstance(last_error, ActorObjectAlreadyClaimed): + # last_error_str = '[green](queue race)[/green]' + # else: + # last_error_str = f'[red]{type(last_error).__name__}: {last_error}[/red]' - print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53] {last_error_str}') + # print(f'[grey53]🏃‍♂️ {self}.on_shutdown() SHUTTING DOWN[/grey53] {last_error_str}') # abx.pm.hook.on_actor_shutdown(actor=self, last_obj=last_obj, last_error=last_error) + pass def on_tick_start(self, obj_to_process: ModelType) -> None: # print(f'🏃‍♂️ {self}.on_tick_start() {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}') @@ -505,11 +511,11 @@ class ActorType(Generic[ModelType]): def on_tick_exception(self, obj_to_process: ModelType, error: Exception) -> None: - print(f'[red]🏃‍♂️ {self}.on_tick_exception()[/red] {obj_to_process.ABID} {obj_to_process.status} {obj_to_process.retry_at}: [red]{type(error).__name__}: {error}[/red]') + print(f'[red]{self._obj_repr(obj_to_process)} 🏃‍♂️ {self}.on_tick_exception()[/red] [blue]{obj_to_process.status}[/blue] +{(obj_to_process.retry_at - timezone.now()).total_seconds() if obj_to_process.retry_at else "-"}s: [red]{type(error).__name__}: {error}[/red]') # abx.pm.hook.on_actor_tick_exception(actor=self, obj_to_process=obj_to_process, error=error) def on_state_change(self, obj_to_process: ModelType, starting_state, ending_state) -> None: - print(f'🏃‍♂️ {self}.on_state_change() {obj_to_process.ABID} {starting_state} ➡️ {ending_state}') + print(f'[blue]{self._obj_repr(obj_to_process)} 🏃‍♂️ {self}.on_state_change() {starting_state} ➡️ {ending_state}[/blue] +{(obj_to_process.retry_at - timezone.now()).total_seconds() if obj_to_process.retry_at else "-"}s') # abx.pm.hook.on_actor_state_change(actor=self, obj_to_process=obj_to_process, starting_state=starting_state, ending_state=ending_state) diff --git a/archivebox/workers/orchestrator.py b/archivebox/workers/orchestrator.py index b2da179f..686d0664 100644 --- a/archivebox/workers/orchestrator.py +++ b/archivebox/workers/orchestrator.py @@ -2,6 +2,7 @@ __package__ = 'archivebox.workers' import os import time +import sys import itertools from typing import Dict, Type, Literal, TYPE_CHECKING from django.utils.functional import classproperty @@ -122,9 +123,9 @@ class Orchestrator: # abx.pm.hook.on_orchestrator_shutdown(self) def on_tick_started(self, all_queues): - total_pending = sum(queue.count() for queue in all_queues.values()) - if total_pending: - print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') + # total_pending = sum(queue.count() for queue in all_queues.values()) + # if total_pending: + # print(f'👨‍✈️ {self}.on_tick_started()', f'total_pending={total_pending}') # abx.pm.hook.on_orchestrator_tick_started(self, actor_types, all_queues) pass @@ -136,7 +137,8 @@ class Orchestrator: pass def on_idle(self, all_queues): - print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}') + # print(f'👨‍✈️ {self}.on_idle()', f'idle_count={self.idle_count}') + print('.', end='', flush=True, file=sys.stderr) # abx.pm.hook.on_orchestrator_idle(self) # check for orphaned objects left behind if self.idle_count == 60: @@ -170,6 +172,7 @@ class Orchestrator: continue next_obj = queue.first() + print() print(f'🏃‍♂️ {self}.runloop() {actor_type.__name__.ljust(20)} queue={str(queue.count()).ljust(3)} next={next_obj.abid if next_obj else "None"} {next_obj.status if next_obj else "None"} {(timezone.now() - next_obj.retry_at).total_seconds() if next_obj and next_obj.retry_at else "None"}') self.idle_count = 0 try: