mirror of
https://github.com/sissbruecker/linkding
synced 2024-11-10 06:04:15 +00:00
a6f35119cd
* Replace django-background-tasks with huey * Add command for migrating tasks * Add custom admin view * fix dockerfile * fix tests * fix tests in CI * fix task tests * implement retries * improve config * workaround to avoid running singlefile tasks in parallel * properly kill single-file sub-processes * use period task for HTML snapshots * clear task lock when starting task consumer * remove obsolete cleanup task command
291 lines
9.2 KiB
Python
291 lines
9.2 KiB
Python
from django.contrib import admin, messages
|
|
from django.contrib.admin import AdminSite
|
|
from django.contrib.auth.admin import UserAdmin
|
|
from django.contrib.auth.models import User
|
|
from django.core.paginator import Paginator
|
|
from django.db.models import Count, QuerySet
|
|
from django.shortcuts import render
|
|
from django.urls import path
|
|
from django.utils.translation import ngettext, gettext
|
|
from huey.contrib.djhuey import HUEY as huey
|
|
from rest_framework.authtoken.admin import TokenAdmin
|
|
from rest_framework.authtoken.models import TokenProxy
|
|
|
|
from bookmarks.models import Bookmark, BookmarkAsset, Tag, UserProfile, Toast, FeedToken
|
|
from bookmarks.services.bookmarks import archive_bookmark, unarchive_bookmark
|
|
|
|
|
|
# Custom paginator to paginate through Huey tasks
|
|
class TaskPaginator(Paginator):
|
|
def __init__(self):
|
|
super().__init__(self, 100)
|
|
self.task_count = huey.storage.queue_size()
|
|
|
|
@property
|
|
def count(self):
|
|
return self.task_count
|
|
|
|
def page(self, number):
|
|
limit = self.per_page
|
|
offset = (number - 1) * self.per_page
|
|
return self._get_page(
|
|
self.enqueued_items(limit, offset),
|
|
number,
|
|
self,
|
|
)
|
|
|
|
# Copied from Huey's SqliteStorage with some modifications to allow pagination
|
|
def enqueued_items(self, limit, offset):
|
|
to_bytes = lambda b: bytes(b) if not isinstance(b, bytes) else b
|
|
sql = "select data from task where queue=? order by priority desc, id limit ? offset ?"
|
|
params = (huey.storage.name, limit, offset)
|
|
|
|
serialized_tasks = [
|
|
to_bytes(i) for i, in huey.storage.sql(sql, params, results=True)
|
|
]
|
|
return [huey.deserialize_task(task) for task in serialized_tasks]
|
|
|
|
|
|
# Custom view to display Huey tasks in the admin
|
|
def background_task_view(request):
|
|
page_number = int(request.GET.get("p", 1))
|
|
paginator = TaskPaginator()
|
|
page = paginator.get_page(page_number)
|
|
page_range = paginator.get_elided_page_range(page_number, on_each_side=2, on_ends=2)
|
|
context = {
|
|
**linkding_admin_site.each_context(request),
|
|
"title": "Background tasks",
|
|
"page": page,
|
|
"page_range": page_range,
|
|
"tasks": page.object_list,
|
|
}
|
|
return render(request, "admin/background_tasks.html", context)
|
|
|
|
|
|
class LinkdingAdminSite(AdminSite):
|
|
site_header = "linkding administration"
|
|
site_title = "linkding Admin"
|
|
|
|
def get_urls(self):
|
|
urls = super().get_urls()
|
|
custom_urls = [
|
|
path("tasks/", background_task_view, name="background_tasks"),
|
|
]
|
|
return custom_urls + urls
|
|
|
|
def get_app_list(self, request, app_label=None):
|
|
app_list = super().get_app_list(request, app_label)
|
|
app_list += [
|
|
{
|
|
"name": "Huey",
|
|
"app_label": "huey_app",
|
|
"models": [
|
|
{
|
|
"name": "Queued tasks",
|
|
"object_name": "background_tasks",
|
|
"admin_url": "/admin/tasks/",
|
|
"view_only": True,
|
|
}
|
|
],
|
|
}
|
|
]
|
|
return app_list
|
|
|
|
|
|
class AdminBookmark(admin.ModelAdmin):
|
|
list_display = ("resolved_title", "url", "is_archived", "owner", "date_added")
|
|
search_fields = (
|
|
"title",
|
|
"description",
|
|
"website_title",
|
|
"website_description",
|
|
"url",
|
|
"tags__name",
|
|
)
|
|
list_filter = (
|
|
"owner__username",
|
|
"is_archived",
|
|
"unread",
|
|
"tags",
|
|
)
|
|
ordering = ("-date_added",)
|
|
actions = [
|
|
"delete_selected_bookmarks",
|
|
"archive_selected_bookmarks",
|
|
"unarchive_selected_bookmarks",
|
|
"mark_as_read",
|
|
"mark_as_unread",
|
|
]
|
|
|
|
def get_actions(self, request):
|
|
actions = super().get_actions(request)
|
|
# Remove default delete action, which gets replaced by delete_selected_bookmarks below
|
|
# The default action shows a confirmation page which can fail in production when selecting all bookmarks and the
|
|
# number of objects to delete exceeds the value in DATA_UPLOAD_MAX_NUMBER_FIELDS (1000 by default)
|
|
del actions["delete_selected"]
|
|
return actions
|
|
|
|
def delete_selected_bookmarks(self, request, queryset: QuerySet):
|
|
bookmarks_count = queryset.count()
|
|
for bookmark in queryset:
|
|
bookmark.delete()
|
|
self.message_user(
|
|
request,
|
|
ngettext(
|
|
"%d bookmark was successfully deleted.",
|
|
"%d bookmarks were successfully deleted.",
|
|
bookmarks_count,
|
|
)
|
|
% bookmarks_count,
|
|
messages.SUCCESS,
|
|
)
|
|
|
|
def archive_selected_bookmarks(self, request, queryset: QuerySet):
|
|
for bookmark in queryset:
|
|
archive_bookmark(bookmark)
|
|
bookmarks_count = queryset.count()
|
|
self.message_user(
|
|
request,
|
|
ngettext(
|
|
"%d bookmark was successfully archived.",
|
|
"%d bookmarks were successfully archived.",
|
|
bookmarks_count,
|
|
)
|
|
% bookmarks_count,
|
|
messages.SUCCESS,
|
|
)
|
|
|
|
def unarchive_selected_bookmarks(self, request, queryset: QuerySet):
|
|
for bookmark in queryset:
|
|
unarchive_bookmark(bookmark)
|
|
bookmarks_count = queryset.count()
|
|
self.message_user(
|
|
request,
|
|
ngettext(
|
|
"%d bookmark was successfully unarchived.",
|
|
"%d bookmarks were successfully unarchived.",
|
|
bookmarks_count,
|
|
)
|
|
% bookmarks_count,
|
|
messages.SUCCESS,
|
|
)
|
|
|
|
def mark_as_read(self, request, queryset: QuerySet):
|
|
bookmarks_count = queryset.count()
|
|
queryset.update(unread=False)
|
|
self.message_user(
|
|
request,
|
|
ngettext(
|
|
"%d bookmark marked as read.",
|
|
"%d bookmarks marked as read.",
|
|
bookmarks_count,
|
|
)
|
|
% bookmarks_count,
|
|
messages.SUCCESS,
|
|
)
|
|
|
|
def mark_as_unread(self, request, queryset: QuerySet):
|
|
bookmarks_count = queryset.count()
|
|
queryset.update(unread=True)
|
|
self.message_user(
|
|
request,
|
|
ngettext(
|
|
"%d bookmark marked as unread.",
|
|
"%d bookmarks marked as unread.",
|
|
bookmarks_count,
|
|
)
|
|
% bookmarks_count,
|
|
messages.SUCCESS,
|
|
)
|
|
|
|
|
|
class AdminBookmarkAsset(admin.ModelAdmin):
|
|
list_display = ("display_name", "date_created", "status")
|
|
search_fields = (
|
|
"display_name",
|
|
"file",
|
|
)
|
|
list_filter = ("status",)
|
|
|
|
|
|
class AdminTag(admin.ModelAdmin):
|
|
list_display = ("name", "bookmarks_count", "owner", "date_added")
|
|
search_fields = ("name", "owner__username")
|
|
list_filter = ("owner__username",)
|
|
ordering = ("-date_added",)
|
|
actions = ["delete_unused_tags"]
|
|
|
|
def get_queryset(self, request):
|
|
queryset = super().get_queryset(request)
|
|
queryset = queryset.annotate(bookmarks_count=Count("bookmark"))
|
|
return queryset
|
|
|
|
def bookmarks_count(self, obj):
|
|
return obj.bookmarks_count
|
|
|
|
bookmarks_count.admin_order_field = "bookmarks_count"
|
|
|
|
def delete_unused_tags(self, request, queryset: QuerySet):
|
|
unused_tags = queryset.filter(bookmark__isnull=True)
|
|
unused_tags_count = unused_tags.count()
|
|
for tag in unused_tags:
|
|
tag.delete()
|
|
|
|
if unused_tags_count > 0:
|
|
self.message_user(
|
|
request,
|
|
ngettext(
|
|
"%d unused tag was successfully deleted.",
|
|
"%d unused tags were successfully deleted.",
|
|
unused_tags_count,
|
|
)
|
|
% unused_tags_count,
|
|
messages.SUCCESS,
|
|
)
|
|
else:
|
|
self.message_user(
|
|
request,
|
|
gettext(
|
|
"There were no unused tags in the selection",
|
|
),
|
|
messages.SUCCESS,
|
|
)
|
|
|
|
|
|
class AdminUserProfileInline(admin.StackedInline):
|
|
model = UserProfile
|
|
can_delete = False
|
|
verbose_name_plural = "Profile"
|
|
fk_name = "user"
|
|
readonly_fields = ("search_preferences",)
|
|
|
|
|
|
class AdminCustomUser(UserAdmin):
|
|
inlines = (AdminUserProfileInline,)
|
|
|
|
def get_inline_instances(self, request, obj=None):
|
|
if not obj:
|
|
return list()
|
|
return super(AdminCustomUser, self).get_inline_instances(request, obj)
|
|
|
|
|
|
class AdminToast(admin.ModelAdmin):
|
|
list_display = ("key", "message", "owner", "acknowledged")
|
|
search_fields = ("key", "message")
|
|
list_filter = ("owner__username",)
|
|
|
|
|
|
class AdminFeedToken(admin.ModelAdmin):
|
|
list_display = ("key", "user")
|
|
search_fields = ["key"]
|
|
list_filter = ("user__username",)
|
|
|
|
|
|
linkding_admin_site = LinkdingAdminSite()
|
|
linkding_admin_site.register(Bookmark, AdminBookmark)
|
|
linkding_admin_site.register(BookmarkAsset, AdminBookmarkAsset)
|
|
linkding_admin_site.register(Tag, AdminTag)
|
|
linkding_admin_site.register(User, AdminCustomUser)
|
|
linkding_admin_site.register(TokenProxy, TokenAdmin)
|
|
linkding_admin_site.register(Toast, AdminToast)
|
|
linkding_admin_site.register(FeedToken, AdminFeedToken)
|