diff --git a/archivebox/actors/actor.py b/archivebox/actors/actor.py index b5a82724..8dac8e44 100644 --- a/archivebox/actors/actor.py +++ b/archivebox/actors/actor.py @@ -2,6 +2,7 @@ __package__ = 'archivebox.actors' import os import time +import traceback from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args from datetime import timedelta from multiprocessing import Process, cpu_count @@ -336,7 +337,7 @@ class ActorType(Generic[ModelType]): while True: # Get the next object to process from the queue try: - obj_to_process = cast(ModelType, self.get_next(atomic=self.atomic)) + obj_to_process = cast(ModelType, self.get_next(atomic=self.CLAIM_ATOMIC)) except (ActorQueueIsEmpty, ActorObjectAlreadyClaimed) as err: last_error = err obj_to_process = None @@ -362,6 +363,7 @@ class ActorType(Generic[ModelType]): # print(f'[red]🏃‍♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]') db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions self.on_tick_exception(obj_to_process, err) + traceback.print_exc() finally: self.on_tick_end(obj_to_process) @@ -376,7 +378,8 @@ class ActorType(Generic[ModelType]): finally: self.on_shutdown(last_obj=obj_to_process, last_error=last_error) - def get_update_kwargs_to_claim_obj(self) -> dict[str, Any]: + @classmethod + def get_update_kwargs_to_claim_obj(cls) -> dict[str, Any]: """ Get the field values needed to mark an pending obj_to_process as being actively processing (aka claimed) by the current Actor. returned kwargs will be applied using: qs.filter(id=obj_to_process.id).update(**kwargs). @@ -384,12 +387,12 @@ class ActorType(Generic[ModelType]): Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars. """ return { - self.STATE_FIELD_NAME: self.ACTIVE_STATE, - 'retry_at': timezone.now() + timedelta(seconds=self.MAX_TICK_TIME), + cls.STATE_FIELD_NAME: cls._state_to_str(cls.ACTIVE_STATE), + 'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME), } def get_next(self, atomic: bool | None=None) -> ModelType | None: - """get the next object from the queue, atomically locking it if self.atomic=True""" + """get the next object from the queue, atomically locking it if self.CLAIM_ATOMIC=True""" atomic = self.CLAIM_ATOMIC if atomic is None else atomic if atomic: # fetch and claim the next object from in the queue in one go atomically @@ -454,7 +457,7 @@ class ActorType(Generic[ModelType]): """ try: return self.Model.objects.raw(atomic_select_and_update_sql, (*update_params, *select_params))[0] - except KeyError: + except IndexError: if self.get_queue().exists(): raise ActorObjectAlreadyClaimed(f'Unable to lock the next {self.Model.__name__} object from {self}.get_queue().first()') else: