ArchiveBox/archivebox/api/v1_core.py
2024-11-18 04:27:38 -08:00

446 lines
14 KiB
Python

__package__ = 'archivebox.api'
import math
from uuid import UUID
from typing import List, Optional, Union, Any
from datetime import datetime
from django.db.models import Q
from django.core.exceptions import ValidationError
from django.contrib.auth import get_user_model
from django.shortcuts import redirect
from ninja import Router, Schema, FilterSchema, Field, Query
from ninja.pagination import paginate, PaginationBase
from ninja.errors import HttpError
from core.models import Snapshot, ArchiveResult, Tag
from api.models import APIToken, OutboundWebhook
from api.v1_crawls import CrawlSchema, SeedSchema
# from .auth import API_AUTH_METHODS
router = Router(tags=['Core Models'])
class CustomPagination(PaginationBase):
class Input(Schema):
limit: int = 200
offset: int = 0
page: int = 0
class Output(Schema):
total_items: int
total_pages: int
page: int
limit: int
offset: int
num_items: int
items: List[Any]
def paginate_queryset(self, queryset, pagination: Input, **params):
limit = min(pagination.limit, 500)
offset = pagination.offset or (pagination.page * limit)
total = queryset.count()
total_pages = math.ceil(total / limit)
current_page = math.ceil(offset / (limit + 1))
items = queryset[offset : offset + limit]
return {
'total_items': total,
'total_pages': total_pages,
'page': current_page,
'limit': limit,
'offset': offset,
'num_items': len(items),
'items': items,
}
### ArchiveResult #########################################################################
class MinimalArchiveResultSchema(Schema):
TYPE: str = 'core.models.ArchiveResult'
id: UUID
abid: str
created_at: datetime | None
modified_at: datetime | None
created_by_id: str
created_by_username: str
status: str
retry_at: datetime | None
extractor: str
cmd_version: str | None
cmd: list[str] | None
pwd: str | None
output: str | None
start_ts: datetime | None
end_ts: datetime | None
@staticmethod
def resolve_created_by_id(obj):
return str(obj.created_by_id)
@staticmethod
def resolve_created_by_username(obj) -> str:
User = get_user_model()
return User.objects.filter(pk=obj.created_by_id).values_list('username', flat=True)[0]
@staticmethod
def resolve_abid(obj):
return str(obj.ABID)
@staticmethod
def resolve_snapshot_timestamp(obj):
return obj.snapshot.timestamp
@staticmethod
def resolve_snapshot_url(obj):
return obj.snapshot.url
@staticmethod
def resolve_snapshot_id(obj):
return str(obj.snapshot_id)
@staticmethod
def resolve_snapshot_abid(obj):
return str(obj.snapshot.ABID)
@staticmethod
def resolve_snapshot_tags(obj):
return sorted(tag.name for tag in obj.snapshot.tags.all())
class ArchiveResultSchema(MinimalArchiveResultSchema):
TYPE: str = 'core.models.ArchiveResult'
# ... Extends MinimalArchiveResultSchema fields ...
snapshot_id: UUID
snapshot_abid: str
snapshot_timestamp: str
snapshot_url: str
snapshot_tags: List[str]
class ArchiveResultFilterSchema(FilterSchema):
id: Optional[str] = Field(None, q=['id__startswith', 'abid__icontains', 'snapshot__id__startswith', 'snapshot__abid__icontains', 'snapshot__timestamp__startswith'])
search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains', 'id__startswith', 'abid__icontains', 'snapshot__id__startswith', 'snapshot__abid__icontains', 'snapshot__timestamp__startswith'])
snapshot_id: Optional[str] = Field(None, q=['snapshot__id__startswith', 'snapshot__abid__icontains', 'snapshot__timestamp__startswith'])
snapshot_url: Optional[str] = Field(None, q='snapshot__url__icontains')
snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name__icontains')
status: Optional[str] = Field(None, q='status')
output: Optional[str] = Field(None, q='output__icontains')
extractor: Optional[str] = Field(None, q='extractor__icontains')
cmd: Optional[str] = Field(None, q='cmd__0__icontains')
pwd: Optional[str] = Field(None, q='pwd__icontains')
cmd_version: Optional[str] = Field(None, q='cmd_version')
created_at: Optional[datetime] = Field(None, q='created_at')
created_at__gte: Optional[datetime] = Field(None, q='created_at__gte')
created_at__lt: Optional[datetime] = Field(None, q='created_at__lt')
@router.get("/archiveresults", response=List[ArchiveResultSchema], url_name="get_archiveresult")
@paginate(CustomPagination)
def get_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
"""List all ArchiveResult entries matching these filters."""
qs = ArchiveResult.objects.all()
results = filters.filter(qs).distinct()
return results
@router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema, url_name="get_archiveresult")
def get_archiveresult(request, archiveresult_id: str):
"""Get a specific ArchiveResult by id or abid."""
return ArchiveResult.objects.get(Q(id__icontains=archiveresult_id) | Q(abid__icontains=archiveresult_id))
# @router.post("/archiveresult", response=ArchiveResultSchema)
# def create_archiveresult(request, payload: ArchiveResultSchema):
# archiveresult = ArchiveResult.objects.create(**payload.dict())
# return archiveresult
#
# @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
# def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
# archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
#
# for attr, value in payload.dict().items():
# setattr(archiveresult, attr, value)
# archiveresult.save()
#
# return archiveresult
#
# @router.delete("/archiveresult/{archiveresult_id}")
# def delete_archiveresult(request, archiveresult_id: str):
# archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
# archiveresult.delete()
# return {"success": True}
### Snapshot #########################################################################
class SnapshotSchema(Schema):
TYPE: str = 'core.models.Snapshot'
id: UUID
abid: str
created_by_id: str
created_by_username: str
created_at: datetime
modified_at: datetime
status: str
retry_at: datetime | None
bookmarked_at: datetime
downloaded_at: Optional[datetime]
url: str
tags: List[str]
title: Optional[str]
timestamp: str
archive_path: str
# url_for_admin: str
# url_for_view: str
num_archiveresults: int
archiveresults: List[MinimalArchiveResultSchema]
@staticmethod
def resolve_created_by_id(obj):
return str(obj.created_by_id)
@staticmethod
def resolve_created_by_username(obj):
User = get_user_model()
return User.objects.get(id=obj.created_by_id).username
@staticmethod
def resolve_abid(obj):
return str(obj.ABID)
@staticmethod
def resolve_tags(obj):
return sorted(tag.name for tag in obj.tags.all())
# @staticmethod
# def resolve_url_for_admin(obj):
# return f"/admin/core/snapshot/{obj.id}/change/"
# @staticmethod
# def resolve_url_for_view(obj):
# return f"/{obj.archive_path}"
@staticmethod
def resolve_num_archiveresults(obj, context):
return obj.archiveresult_set.all().distinct().count()
@staticmethod
def resolve_archiveresults(obj, context):
if context['request'].with_archiveresults:
return obj.archiveresult_set.all().distinct()
return ArchiveResult.objects.none()
class SnapshotFilterSchema(FilterSchema):
id: Optional[str] = Field(None, q=['id__icontains', 'abid__icontains', 'timestamp__startswith'])
abid: Optional[str] = Field(None, q='abid__icontains')
created_by_id: str = Field(None, q='created_by_id')
created_by_username: str = Field(None, q='created_by__username__icontains')
created_at__gte: datetime = Field(None, q='created_at__gte')
created_at__lt: datetime = Field(None, q='created_at__lt')
created_at: datetime = Field(None, q='created_at')
modified_at: datetime = Field(None, q='modified_at')
modified_at__gte: datetime = Field(None, q='modified_at__gte')
modified_at__lt: datetime = Field(None, q='modified_at__lt')
search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains', 'id__icontains', 'abid__icontains', 'timestamp__startswith'])
url: Optional[str] = Field(None, q='url')
tag: Optional[str] = Field(None, q='tags__name')
title: Optional[str] = Field(None, q='title__icontains')
timestamp: Optional[str] = Field(None, q='timestamp__startswith')
bookmarked_at__gte: Optional[datetime] = Field(None, q='bookmarked_at__gte')
bookmarked_at__lt: Optional[datetime] = Field(None, q='bookmarked_at__lt')
@router.get("/snapshots", response=List[SnapshotSchema], url_name="get_snapshots")
@paginate(CustomPagination)
def get_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=False):
"""List all Snapshot entries matching these filters."""
request.with_archiveresults = with_archiveresults
qs = Snapshot.objects.all()
results = filters.filter(qs).distinct()
return results
@router.get("/snapshot/{snapshot_id}", response=SnapshotSchema, url_name="get_snapshot")
def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
"""Get a specific Snapshot by abid or id."""
request.with_archiveresults = with_archiveresults
snapshot = None
try:
snapshot = Snapshot.objects.get(Q(abid__startswith=snapshot_id) | Q(id__startswith=snapshot_id) | Q(timestamp__startswith=snapshot_id))
except Snapshot.DoesNotExist:
pass
try:
snapshot = snapshot or Snapshot.objects.get(Q(abid__icontains=snapshot_id) | Q(id__icontains=snapshot_id))
except Snapshot.DoesNotExist:
pass
if not snapshot:
raise Snapshot.DoesNotExist
return snapshot
# @router.post("/snapshot", response=SnapshotSchema)
# def create_snapshot(request, payload: SnapshotSchema):
# snapshot = Snapshot.objects.create(**payload.dict())
# return snapshot
#
# @router.put("/snapshot/{snapshot_id}", response=SnapshotSchema)
# def update_snapshot(request, snapshot_id: str, payload: SnapshotSchema):
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
#
# for attr, value in payload.dict().items():
# setattr(snapshot, attr, value)
# snapshot.save()
#
# return snapshot
#
# @router.delete("/snapshot/{snapshot_id}")
# def delete_snapshot(request, snapshot_id: str):
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
# snapshot.delete()
# return {"success": True}
### Tag #########################################################################
class TagSchema(Schema):
TYPE: str = 'core.models.Tag'
id: UUID
abid: str
modified_at: datetime
created_at: datetime
created_by_id: str
created_by_username: str
name: str
slug: str
num_snapshots: int
snapshots: List[SnapshotSchema]
@staticmethod
def resolve_created_by_id(obj):
return str(obj.created_by_id)
@staticmethod
def resolve_created_by_username(obj):
User = get_user_model()
return User.objects.get(id=obj.created_by_id).username
@staticmethod
def resolve_num_snapshots(obj, context):
return obj.snapshot_set.all().distinct().count()
@staticmethod
def resolve_snapshots(obj, context):
if context['request'].with_snapshots:
return obj.snapshot_set.all().distinct()
return Snapshot.objects.none()
@router.get("/tags", response=List[TagSchema], url_name="get_tags")
@paginate(CustomPagination)
def get_tags(request):
request.with_snapshots = False
request.with_archiveresults = False
return Tag.objects.all().distinct()
@router.get("/tag/{tag_id}", response=TagSchema, url_name="get_tag")
def get_tag(request, tag_id: str, with_snapshots: bool=True):
request.with_snapshots = with_snapshots
request.with_archiveresults = False
tag = None
try:
tag = Tag.objects.get(abid__icontains=tag_id)
except (Tag.DoesNotExist, ValidationError):
pass
try:
tag = tag or Tag.objects.get(id__icontains=tag_id)
except (Tag.DoesNotExist, ValidationError):
pass
return tag
@router.get("/any/{abid}", response=Union[SnapshotSchema, ArchiveResultSchema, TagSchema, SeedSchema, CrawlSchema], url_name="get_any", summary="Get any object by its ABID or ID (e.g. snapshot, archiveresult, tag, seed, crawl, etc.)")
def get_any(request, abid: str):
"""Get any object by its ABID or ID (e.g. snapshot, archiveresult, tag, seed, crawl, etc.)."""
request.with_snapshots = False
request.with_archiveresults = False
if abid.startswith(APIToken.abid_prefix):
raise HttpError(403, 'APIToken objects are not accessible via REST API')
if abid.startswith(OutboundWebhook.abid_prefix):
raise HttpError(403, 'OutboundWebhook objects are not accessible via REST API')
response = None
try:
response = response or get_snapshot(request, abid)
except Exception:
pass
try:
response = response or get_archiveresult(request, abid)
except Exception:
pass
try:
response = response or get_tag(request, abid)
except Exception:
pass
try:
from api.v1_crawls import get_seed
response = response or get_seed(request, abid)
except Exception:
pass
try:
from api.v1_crawls import get_crawl
response = response or get_crawl(request, abid)
except Exception:
pass
if response:
app_label, model_name = response._meta.app_label, response._meta.model_name
return redirect(f"/api/v1/{app_label}/{model_name}/{response.abid}?{request.META['QUERY_STRING']}")
raise HttpError(404, 'Object with given ABID not found')