add ABIDModel.update_for_workers to update-in-place + bump retry_at time

This commit is contained in:
Nick Sweeting 2024-11-18 02:23:50 -08:00
parent b852442efc
commit c8b830b8dc
No known key found for this signature in database

View file

@ -7,6 +7,7 @@ from typing import Any, Dict, Union, List, Set, cast
from uuid import uuid4
from functools import partial
from pathlib import Path
from charidfield import CharIDField # type: ignore[import-untyped]
from django.contrib import admin
@ -73,7 +74,7 @@ class ABIDError(Exception):
class ABIDModel(models.Model):
"""
Abstract Base Model for other models to depend on. Provides ArchiveBox ID (ABID) interface.
Abstract Base Model for other models to depend on. Provides ArchiveBox ID (ABID) interface and other helper methods.
"""
abid_prefix: str = DEFAULT_ABID_PREFIX # e.g. 'tag_'
abid_ts_src = 'self.created_at' # e.g. 'self.created_at'
@ -86,10 +87,28 @@ class ABIDModel(models.Model):
# id = models.UUIDField(primary_key=True, default=None, null=False, editable=False, unique=True, verbose_name='ID')
# abid = ABIDField(prefix=abid_prefix)
# created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False)
# created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=None, null=False, db_index=True)
# created_at = AutoDateTimeField(default=None, null=False, db_index=True)
# modified_at = models.DateTimeField(auto_now=True)
# if ModelWithNotesMixin model:
# notes = models.TextField(blank=True, null=False, default='', help_text='Any extra notes this snapshot should have')
# if StateMachineMixin model:
# retry_at = models.DateTimeField(default=None, null=True, db_index=True)
# status = models.CharField(max_length=16, choices=StatusChoices.choices, default=StatusChoices.QUEUED)
#
# StatusChoices: ClassVar[Type[DefaultStatusChoices]] = DefaultStatusChoices
# state_machine_attr: ClassVar[str] = 'sm'
# state_machine_name: ClassVar[str] = 'core.statemachines.ArchiveResultMachine'
# retry_at_field_name: ClassVar[str] = 'retry_at'
# state_field_name: ClassVar[str] = 'status'
# active_state: ClassVar[str] = StatusChoices.STARTED
# if ModelWithHealthStats model:
# num_uses_failed = models.PositiveIntegerField(default=0)
# num_uses_succeeded = models.PositiveIntegerField(default=0)
_prefetched_objects_cache: Dict[str, Any]
class Meta(TypedModelMeta):
@ -329,7 +348,12 @@ class ABIDModel(models.Model):
def get_absolute_url(self):
return self.api_docs_url
def update_for_workers(self, **update_kwargs) -> bool:
"""Immediately update the **kwargs on the object in DB, and reset the retry_at to now()"""
updated = bool(self._meta.model.objects.filter(pk=self.pk).update(retry_at=timezone.now(), **update_kwargs))
self.refresh_from_db()
return updated
class ModelWithHealthStats(models.Model):