linkding/bookmarks/services/bookmarks.py
Viacheslav Slinko fa5f78cf71
Automatically add tags to bookmarks based on URL pattern (#736)
* [WIP] DSL

* upd

* upd

* upd

* upd

* upd

* upd

* upd

* upd

* upd

* upd

* upd

* dsl2

* full feature

* upd

* upd

* upd

* upd

* rename to auto_tagging_rules

* update migration after rebase

* add REST API tests

* improve settings view

---------

Co-authored-by: Sascha Ißbrücker <sascha.issbruecker@gmail.com>
2024-05-17 09:39:46 +02:00

261 lines
8.9 KiB
Python

import logging
import os
from typing import Union
from django.conf import settings
from django.contrib.auth.models import User
from django.core.files.uploadedfile import UploadedFile
from django.utils import timezone
from bookmarks.models import Bookmark, BookmarkAsset, parse_tag_string
from bookmarks.services import tasks
from bookmarks.services import website_loader
from bookmarks.services import auto_tagging
from bookmarks.services.tags import get_or_create_tags
logger = logging.getLogger(__name__)
def create_bookmark(bookmark: Bookmark, tag_string: str, current_user: User):
# If URL is already bookmarked, then update it
existing_bookmark: Bookmark = Bookmark.objects.filter(
owner=current_user, url=bookmark.url
).first()
if existing_bookmark is not None:
_merge_bookmark_data(bookmark, existing_bookmark)
return update_bookmark(existing_bookmark, tag_string, current_user)
# Update website info
_update_website_metadata(bookmark)
# Set currently logged in user as owner
bookmark.owner = current_user
# Set dates
bookmark.date_added = timezone.now()
bookmark.date_modified = timezone.now()
bookmark.save()
# Update tag list
_update_bookmark_tags(bookmark, tag_string, current_user)
bookmark.save()
# Create snapshot on web archive
tasks.create_web_archive_snapshot(current_user, bookmark, False)
# Load favicon
tasks.load_favicon(current_user, bookmark)
# Load preview image
tasks.load_preview_image(current_user, bookmark)
# Create HTML snapshot
if current_user.profile.enable_automatic_html_snapshots:
tasks.create_html_snapshot(bookmark)
return bookmark
def update_bookmark(bookmark: Bookmark, tag_string, current_user: User):
# Detect URL change
original_bookmark = Bookmark.objects.get(id=bookmark.id)
has_url_changed = original_bookmark.url != bookmark.url
# Update tag list
_update_bookmark_tags(bookmark, tag_string, current_user)
# Update dates
bookmark.date_modified = timezone.now()
bookmark.save()
# Update favicon
tasks.load_favicon(current_user, bookmark)
# Update preview image
tasks.load_preview_image(current_user, bookmark)
if has_url_changed:
# Update web archive snapshot, if URL changed
tasks.create_web_archive_snapshot(current_user, bookmark, True)
# Only update website metadata if URL changed
_update_website_metadata(bookmark)
bookmark.save()
return bookmark
def archive_bookmark(bookmark: Bookmark):
bookmark.is_archived = True
bookmark.date_modified = timezone.now()
bookmark.save()
return bookmark
def archive_bookmarks(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).update(
is_archived=True, date_modified=timezone.now()
)
def unarchive_bookmark(bookmark: Bookmark):
bookmark.is_archived = False
bookmark.date_modified = timezone.now()
bookmark.save()
return bookmark
def unarchive_bookmarks(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).update(
is_archived=False, date_modified=timezone.now()
)
def delete_bookmarks(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).delete()
def tag_bookmarks(bookmark_ids: [Union[int, str]], tag_string: str, current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
owned_bookmark_ids = Bookmark.objects.filter(
owner=current_user, id__in=sanitized_bookmark_ids
).values_list("id", flat=True)
tag_names = parse_tag_string(tag_string)
tags = get_or_create_tags(tag_names, current_user)
BookmarkToTagRelationShip = Bookmark.tags.through
relationships = []
for tag in tags:
for bookmark_id in owned_bookmark_ids:
relationships.append(
BookmarkToTagRelationShip(bookmark_id=bookmark_id, tag=tag)
)
# Insert all bookmark -> tag associations at once, should ignore errors if association already exists
BookmarkToTagRelationShip.objects.bulk_create(relationships, ignore_conflicts=True)
Bookmark.objects.filter(id__in=owned_bookmark_ids).update(
date_modified=timezone.now()
)
def untag_bookmarks(
bookmark_ids: [Union[int, str]], tag_string: str, current_user: User
):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
owned_bookmark_ids = Bookmark.objects.filter(
owner=current_user, id__in=sanitized_bookmark_ids
).values_list("id", flat=True)
tag_names = parse_tag_string(tag_string)
tags = get_or_create_tags(tag_names, current_user)
BookmarkToTagRelationShip = Bookmark.tags.through
for tag in tags:
# Remove all bookmark -> tag associations for the owned bookmarks and the current tag
BookmarkToTagRelationShip.objects.filter(
bookmark_id__in=owned_bookmark_ids, tag=tag
).delete()
Bookmark.objects.filter(id__in=owned_bookmark_ids).update(
date_modified=timezone.now()
)
def mark_bookmarks_as_read(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).update(
unread=False, date_modified=timezone.now()
)
def mark_bookmarks_as_unread(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).update(
unread=True, date_modified=timezone.now()
)
def share_bookmarks(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).update(
shared=True, date_modified=timezone.now()
)
def unshare_bookmarks(bookmark_ids: [Union[int, str]], current_user: User):
sanitized_bookmark_ids = _sanitize_id_list(bookmark_ids)
Bookmark.objects.filter(owner=current_user, id__in=sanitized_bookmark_ids).update(
shared=False, date_modified=timezone.now()
)
def _generate_upload_asset_filename(asset: BookmarkAsset, filename: str):
formatted_datetime = asset.date_created.strftime("%Y-%m-%d_%H%M%S")
return f"{asset.asset_type}_{formatted_datetime}_{filename}"
def upload_asset(bookmark: Bookmark, upload_file: UploadedFile) -> BookmarkAsset:
asset = BookmarkAsset(
bookmark=bookmark,
asset_type=BookmarkAsset.TYPE_UPLOAD,
content_type=upload_file.content_type,
display_name=upload_file.name,
status=BookmarkAsset.STATUS_PENDING,
gzip=False,
)
asset.save()
try:
filename = _generate_upload_asset_filename(asset, upload_file.name)
filepath = os.path.join(settings.LD_ASSET_FOLDER, filename)
with open(filepath, "wb") as f:
for chunk in upload_file.chunks():
f.write(chunk)
asset.status = BookmarkAsset.STATUS_COMPLETE
asset.file = filename
asset.file_size = upload_file.size
logger.info(
f"Successfully uploaded asset file. bookmark={bookmark} file={upload_file.name}"
)
except Exception as e:
logger.error(
f"Failed to upload asset file. bookmark={bookmark} file={upload_file.name}",
exc_info=e,
)
asset.status = BookmarkAsset.STATUS_FAILURE
asset.save()
return asset
def _merge_bookmark_data(from_bookmark: Bookmark, to_bookmark: Bookmark):
to_bookmark.title = from_bookmark.title
to_bookmark.description = from_bookmark.description
to_bookmark.notes = from_bookmark.notes
to_bookmark.unread = from_bookmark.unread
to_bookmark.shared = from_bookmark.shared
def _update_website_metadata(bookmark: Bookmark):
metadata = website_loader.load_website_metadata(bookmark.url)
bookmark.website_title = metadata.title
bookmark.website_description = metadata.description
def _update_bookmark_tags(bookmark: Bookmark, tag_string: str, user: User):
tag_names = parse_tag_string(tag_string)
if user.profile.auto_tagging_rules:
auto_tag_names = auto_tagging.get_tags(
user.profile.auto_tagging_rules, bookmark.url
)
for auto_tag_name in auto_tag_names:
if auto_tag_name not in tag_names:
tag_names.append(auto_tag_name)
tags = get_or_create_tags(tag_names, user)
bookmark.tags.set(tags)
def _sanitize_id_list(bookmark_ids: [Union[int, str]]) -> [int]:
# Convert string ids to int if necessary
return [int(bm_id) if isinstance(bm_id, str) else bm_id for bm_id in bookmark_ids]