mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-22 20:23:12 +00:00
fix minor actor erros around CLAIM_ATOMIC
This commit is contained in:
parent
7c0e3dcc21
commit
c3d692b5d5
1 changed files with 9 additions and 6 deletions
|
@ -2,6 +2,7 @@ __package__ = 'archivebox.actors'
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
import traceback
|
||||||
from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args
|
from typing import ClassVar, Generic, TypeVar, Any, Literal, Type, Iterable, cast, get_args
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from multiprocessing import Process, cpu_count
|
from multiprocessing import Process, cpu_count
|
||||||
|
@ -336,7 +337,7 @@ class ActorType(Generic[ModelType]):
|
||||||
while True:
|
while True:
|
||||||
# Get the next object to process from the queue
|
# Get the next object to process from the queue
|
||||||
try:
|
try:
|
||||||
obj_to_process = cast(ModelType, self.get_next(atomic=self.atomic))
|
obj_to_process = cast(ModelType, self.get_next(atomic=self.CLAIM_ATOMIC))
|
||||||
except (ActorQueueIsEmpty, ActorObjectAlreadyClaimed) as err:
|
except (ActorQueueIsEmpty, ActorObjectAlreadyClaimed) as err:
|
||||||
last_error = err
|
last_error = err
|
||||||
obj_to_process = None
|
obj_to_process = None
|
||||||
|
@ -362,6 +363,7 @@ class ActorType(Generic[ModelType]):
|
||||||
# print(f'[red]🏃♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]')
|
# print(f'[red]🏃♂️ {self}.tick()[/red] {obj_to_process} ERROR: [red]{type(err).__name__}: {err}[/red]')
|
||||||
db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions
|
db.connections.close_all() # always reset the db connection after an exception to clear any pending transactions
|
||||||
self.on_tick_exception(obj_to_process, err)
|
self.on_tick_exception(obj_to_process, err)
|
||||||
|
traceback.print_exc()
|
||||||
finally:
|
finally:
|
||||||
self.on_tick_end(obj_to_process)
|
self.on_tick_end(obj_to_process)
|
||||||
|
|
||||||
|
@ -376,7 +378,8 @@ class ActorType(Generic[ModelType]):
|
||||||
finally:
|
finally:
|
||||||
self.on_shutdown(last_obj=obj_to_process, last_error=last_error)
|
self.on_shutdown(last_obj=obj_to_process, last_error=last_error)
|
||||||
|
|
||||||
def get_update_kwargs_to_claim_obj(self) -> dict[str, Any]:
|
@classmethod
|
||||||
|
def get_update_kwargs_to_claim_obj(cls) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Get the field values needed to mark an pending obj_to_process as being actively processing (aka claimed)
|
Get the field values needed to mark an pending obj_to_process as being actively processing (aka claimed)
|
||||||
by the current Actor. returned kwargs will be applied using: qs.filter(id=obj_to_process.id).update(**kwargs).
|
by the current Actor. returned kwargs will be applied using: qs.filter(id=obj_to_process.id).update(**kwargs).
|
||||||
|
@ -384,12 +387,12 @@ class ActorType(Generic[ModelType]):
|
||||||
Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars.
|
Can be a defined as a normal method (instead of classmethod) on subclasses if it needs to access instance vars.
|
||||||
"""
|
"""
|
||||||
return {
|
return {
|
||||||
self.STATE_FIELD_NAME: self.ACTIVE_STATE,
|
cls.STATE_FIELD_NAME: cls._state_to_str(cls.ACTIVE_STATE),
|
||||||
'retry_at': timezone.now() + timedelta(seconds=self.MAX_TICK_TIME),
|
'retry_at': timezone.now() + timedelta(seconds=cls.MAX_TICK_TIME),
|
||||||
}
|
}
|
||||||
|
|
||||||
def get_next(self, atomic: bool | None=None) -> ModelType | None:
|
def get_next(self, atomic: bool | None=None) -> ModelType | None:
|
||||||
"""get the next object from the queue, atomically locking it if self.atomic=True"""
|
"""get the next object from the queue, atomically locking it if self.CLAIM_ATOMIC=True"""
|
||||||
atomic = self.CLAIM_ATOMIC if atomic is None else atomic
|
atomic = self.CLAIM_ATOMIC if atomic is None else atomic
|
||||||
if atomic:
|
if atomic:
|
||||||
# fetch and claim the next object from in the queue in one go atomically
|
# fetch and claim the next object from in the queue in one go atomically
|
||||||
|
@ -454,7 +457,7 @@ class ActorType(Generic[ModelType]):
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
return self.Model.objects.raw(atomic_select_and_update_sql, (*update_params, *select_params))[0]
|
return self.Model.objects.raw(atomic_select_and_update_sql, (*update_params, *select_params))[0]
|
||||||
except KeyError:
|
except IndexError:
|
||||||
if self.get_queue().exists():
|
if self.get_queue().exists():
|
||||||
raise ActorObjectAlreadyClaimed(f'Unable to lock the next {self.Model.__name__} object from {self}.get_queue().first()')
|
raise ActorObjectAlreadyClaimed(f'Unable to lock the next {self.Model.__name__} object from {self}.get_queue().first()')
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Reference in a new issue