mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-23 04:33:11 +00:00
210 lines
6.2 KiB
Python
210 lines
6.2 KiB
Python
__package__ = 'archivebox.api'
|
|
|
|
from uuid import UUID
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
|
|
from django.shortcuts import get_object_or_404
|
|
|
|
from ninja import Router, Schema, FilterSchema, Field, Query
|
|
from ninja.pagination import paginate
|
|
|
|
from core.models import Snapshot, ArchiveResult, Tag
|
|
|
|
|
|
router = Router(tags=['Core Models'])
|
|
|
|
|
|
|
|
|
|
### ArchiveResult #########################################################################
|
|
|
|
class ArchiveResultSchema(Schema):
|
|
id: UUID
|
|
|
|
snapshot_id: UUID
|
|
snapshot_url: str
|
|
snapshot_tags: str
|
|
|
|
extractor: str
|
|
cmd: List[str]
|
|
pwd: str
|
|
cmd_version: str
|
|
output: str
|
|
status: str
|
|
|
|
created: datetime
|
|
|
|
@staticmethod
|
|
def resolve_id(obj):
|
|
return obj.uuid
|
|
|
|
@staticmethod
|
|
def resolve_created(obj):
|
|
return obj.start_ts
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_url(obj):
|
|
return obj.snapshot.url
|
|
|
|
@staticmethod
|
|
def resolve_snapshot_tags(obj):
|
|
return obj.snapshot.tags_str()
|
|
|
|
|
|
class ArchiveResultFilterSchema(FilterSchema):
|
|
id: Optional[UUID] = Field(None, q='uuid')
|
|
|
|
search: Optional[str] = Field(None, q=['snapshot__url__icontains', 'snapshot__title__icontains', 'snapshot__tags__name__icontains', 'extractor', 'output__icontains'])
|
|
snapshot_id: Optional[UUID] = Field(None, q='snapshot_id')
|
|
snapshot_url: Optional[str] = Field(None, q='snapshot__url')
|
|
snapshot_tag: Optional[str] = Field(None, q='snapshot__tags__name')
|
|
|
|
status: Optional[str] = Field(None, q='status')
|
|
output: Optional[str] = Field(None, q='output__icontains')
|
|
extractor: Optional[str] = Field(None, q='extractor__icontains')
|
|
cmd: Optional[str] = Field(None, q='cmd__0__icontains')
|
|
pwd: Optional[str] = Field(None, q='pwd__icontains')
|
|
cmd_version: Optional[str] = Field(None, q='cmd_version')
|
|
|
|
created: Optional[datetime] = Field(None, q='updated')
|
|
created__gte: Optional[datetime] = Field(None, q='updated__gte')
|
|
created__lt: Optional[datetime] = Field(None, q='updated__lt')
|
|
|
|
|
|
@router.get("/archiveresults", response=List[ArchiveResultSchema])
|
|
@paginate
|
|
def list_archiveresults(request, filters: ArchiveResultFilterSchema = Query(...)):
|
|
qs = ArchiveResult.objects.all()
|
|
results = filters.filter(qs)
|
|
return results
|
|
|
|
|
|
@router.get("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
|
|
def get_archiveresult(request, archiveresult_id: str):
|
|
archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
|
|
return archiveresult
|
|
|
|
|
|
# @router.post("/archiveresult", response=ArchiveResultSchema)
|
|
# def create_archiveresult(request, payload: ArchiveResultSchema):
|
|
# archiveresult = ArchiveResult.objects.create(**payload.dict())
|
|
# return archiveresult
|
|
#
|
|
# @router.put("/archiveresult/{archiveresult_id}", response=ArchiveResultSchema)
|
|
# def update_archiveresult(request, archiveresult_id: str, payload: ArchiveResultSchema):
|
|
# archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
|
|
#
|
|
# for attr, value in payload.dict().items():
|
|
# setattr(archiveresult, attr, value)
|
|
# archiveresult.save()
|
|
#
|
|
# return archiveresult
|
|
#
|
|
# @router.delete("/archiveresult/{archiveresult_id}")
|
|
# def delete_archiveresult(request, archiveresult_id: str):
|
|
# archiveresult = get_object_or_404(ArchiveResult, id=archiveresult_id)
|
|
# archiveresult.delete()
|
|
# return {"success": True}
|
|
|
|
|
|
|
|
|
|
|
|
### Snapshot #########################################################################
|
|
|
|
|
|
class SnapshotSchema(Schema):
|
|
id: UUID
|
|
|
|
url: str
|
|
tags: str
|
|
title: Optional[str]
|
|
timestamp: str
|
|
bookmarked: datetime
|
|
added: datetime
|
|
updated: datetime
|
|
archive_path: str
|
|
|
|
archiveresults: List[ArchiveResultSchema]
|
|
|
|
# @staticmethod
|
|
# def resolve_id(obj):
|
|
# return str(obj.id)
|
|
|
|
@staticmethod
|
|
def resolve_tags(obj):
|
|
return obj.tags_str()
|
|
|
|
@staticmethod
|
|
def resolve_archiveresults(obj, context):
|
|
if context['request'].with_archiveresults:
|
|
return obj.archiveresult_set.all().distinct()
|
|
return ArchiveResult.objects.none()
|
|
|
|
|
|
class SnapshotFilterSchema(FilterSchema):
|
|
id: Optional[UUID] = Field(None, q='id')
|
|
|
|
search: Optional[str] = Field(None, q=['url__icontains', 'title__icontains', 'tags__name__icontains'])
|
|
url: Optional[str] = Field(None, q='url')
|
|
tag: Optional[str] = Field(None, q='tags__name')
|
|
title: Optional[str] = Field(None, q='title__icontains')
|
|
|
|
timestamp: Optional[str] = Field(None, q='timestamp__startswith')
|
|
|
|
added: Optional[datetime] = Field(None, q='added')
|
|
added__gte: Optional[datetime] = Field(None, q='added__gte')
|
|
added__lt: Optional[datetime] = Field(None, q='added__lt')
|
|
|
|
|
|
@router.get("/snapshots", response=List[SnapshotSchema])
|
|
@paginate
|
|
def list_snapshots(request, filters: SnapshotFilterSchema = Query(...), with_archiveresults: bool=True):
|
|
request.with_archiveresults = with_archiveresults
|
|
|
|
qs = Snapshot.objects.all()
|
|
results = filters.filter(qs)
|
|
return results
|
|
|
|
@router.get("/snapshot/{snapshot_id}", response=SnapshotSchema)
|
|
def get_snapshot(request, snapshot_id: str, with_archiveresults: bool=True):
|
|
request.with_archiveresults = with_archiveresults
|
|
snapshot = get_object_or_404(Snapshot, id=snapshot_id)
|
|
return snapshot
|
|
|
|
|
|
# @router.post("/snapshot", response=SnapshotSchema)
|
|
# def create_snapshot(request, payload: SnapshotSchema):
|
|
# snapshot = Snapshot.objects.create(**payload.dict())
|
|
# return snapshot
|
|
#
|
|
# @router.put("/snapshot/{snapshot_id}", response=SnapshotSchema)
|
|
# def update_snapshot(request, snapshot_id: str, payload: SnapshotSchema):
|
|
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
|
|
#
|
|
# for attr, value in payload.dict().items():
|
|
# setattr(snapshot, attr, value)
|
|
# snapshot.save()
|
|
#
|
|
# return snapshot
|
|
#
|
|
# @router.delete("/snapshot/{snapshot_id}")
|
|
# def delete_snapshot(request, snapshot_id: str):
|
|
# snapshot = get_object_or_404(Snapshot, id=snapshot_id)
|
|
# snapshot.delete()
|
|
# return {"success": True}
|
|
|
|
|
|
|
|
### Tag #########################################################################
|
|
|
|
|
|
class TagSchema(Schema):
|
|
name: str
|
|
slug: str
|
|
|
|
|
|
@router.get("/tags", response=List[TagSchema])
|
|
def list_tags(request):
|
|
return Tag.objects.all()
|