mirror of
synced 2025-02-16 13:28:29 +00:00
fix REST API CSRF and auth handling
This commit is contained in:
9 changed files with 164 additions and 89 deletions
@ -1,12 +1,63 @@
from django.contrib import admin
from datetime import datetime
from django.utils.html import format_html
from api.auth import get_or_create_api_token
def get_abid_info(self, obj, request=None):
return format_html(
# URL Hash: <code style="font-size: 10px; user-select: all">{}</code><br/>
<a href="{}" style="font-size: 16px; font-family: monospace; user-select: all; border-radius: 8px; background-color: #ddf; padding: 3px 5px; border: 1px solid #aaa; margin-bottom: 8px; display: inline-block; vertical-align: top;">{}</a> <a href="{}" style="color: limegreen; font-size: 0.9em; vertical-align: 1px; font-family: monospace;">📖 API DOCS</a>
<div style="opacity: 0.8">
<small style="opacity: 0.8">.abid: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
<small style="opacity: 0.8">.abid.uuid: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
<small style="opacity: 0.8">.id: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
TS: <code style="font-size: 10px;"><b style="user-select: all">{}</b> {}</code> {}: <code style="user-select: all">{}</code><br/>
URI: <code style="font-size: 10px; "><b style="user-select: all">{}</b> {}</code> <span style="display:inline-block; vertical-align: -4px; width: 290px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;">{}: <code style="user-select: all">{}</code></span>
SALT: <code style="font-size: 10px;"><b style="display:inline-block; user-select: all; width: 50px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;">{}</b></code><br/>
SUBTYPE: <code style="font-size: 10px;"><b style="user-select: all">{}</b> {}</code> {}: <code style="user-select: all">{}</code><br/>
RAND: <code style="font-size: 10px;"><b style="user-select: all">{}</b> {}</code> {}: <code style="user-select: all">{}</code>
<small style="opacity: 0.5">.old_id: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
obj.api_url + (f'?api_key={get_or_create_api_token(request.user)}' if request and request.user else ''), obj.api_url, obj.api_docs_url,
obj.ABID.ts, str(obj.ABID.uuid)[0:14], obj.abid_ts_src, obj.abid_values['ts'].isoformat() if isinstance(obj.abid_values['ts'], datetime) else obj.abid_values['ts'],
obj.ABID.uri, str(obj.ABID.uuid)[14:26], obj.abid_uri_src, str(obj.abid_values['uri']),
obj.ABID.subtype, str(obj.ABID.uuid)[26:28], obj.abid_subtype_src, str(obj.abid_values['subtype']),
obj.ABID.rand, str(obj.ABID.uuid)[28:36], obj.abid_rand_src, str(obj.abid_values['rand'])[-7:],
str(getattr(obj, 'old_id', '')),
except Exception as e:
return str(e)
class ABIDModelAdmin(admin.ModelAdmin):
list_display = ('created', 'created_by', 'abid', '__str__')
sort_fields = ('created', 'created_by', 'abid', '__str__')
readonly_fields = ('abid', 'created', '__str__')
readonly_fields = ('created', 'modified', '__str__', 'API')
def API(self, obj):
return get_abid_info(self, obj, request=self.request)
def queryset(self, request):
self.request = request
return super().queryset(request)
def change_view(self, request, object_id, form_url="", extra_context=None):
self.request = request
return super().change_view(request, object_id, form_url, extra_context)
def get_form(self, request, obj=None, **kwargs):
self.request = request
form = super().get_form(request, obj, **kwargs)
if 'created_by' in form.base_fields:
form.base_fields['created_by'].initial = request.user
@ -18,6 +18,7 @@ from django.db import models
from django.utils import timezone
from django.db.utils import OperationalError
from django.contrib.auth import get_user_model
from django.urls import reverse_lazy
from django_stubs_ext.db.models import TypedModelMeta
@ -211,6 +212,15 @@ class ABIDModel(models.Model):
return self.ABID.typeid
def api_url(self) -> str:
# /api/v1/core/any/{abid}
return reverse_lazy('api-1:get_any', args=[self.abid])
def api_docs_url(self) -> str:
return f'/api/v1/docs#/{self._meta.app_label.title()}%20Models/api_v1_{self._meta.app_label}_get_{self._meta.db_table}'
@ -1,13 +1,34 @@
__package__ = 'archivebox.api'
from typing import Optional, cast
from typing import Any, Optional, cast
from datetime import timedelta
from django.http import HttpRequest
from django.utils import timezone
from django.contrib.auth import login
from django.contrib.auth import authenticate
from django.contrib.auth.models import AbstractBaseUser
from ninja.security import HttpBearer, APIKeyQuery, APIKeyHeader, HttpBasicAuth, django_auth_superuser
from ninja.errors import HttpError
def get_or_create_api_token(user):
from api.models import APIToken
if user and user.is_superuser:
api_tokens = APIToken.objects.filter(created_by_id=user.pk, expires__gt=timezone.now())
if api_tokens.exists():
# unexpired token exists, use it
api_token = api_tokens.last()
# does not exist, create a new one
api_token = APIToken.objects.create(created_by_id=user.pk, expires=timezone.now() + timedelta(days=30))
assert api_token.is_valid(), f"API token is not valid {api_token}"
return api_token
return None
def auth_using_token(token, request: Optional[HttpRequest]=None) -> Optional[AbstractBaseUser]:
@ -16,21 +37,20 @@ def auth_using_token(token, request: Optional[HttpRequest]=None) -> Optional[Abs
user = None
submitted_empty_form = token in ('string', '', None)
if submitted_empty_form:
assert request is not None, 'No request provided for API key authentication'
user = request.user # see if user is authed via django session and use that as the default
submitted_empty_form = str(token).strip() in ('string', '', 'None', 'null')
if not submitted_empty_form:
token = APIToken.objects.get(token=token)
if token.is_valid():
user = token.created_by
request._api_token = token
except APIToken.DoesNotExist:
if not user:
print('[❌] Failed to authenticate API user using API Key:', request)
# print('[❌] Failed to authenticate API user using API Key:', request)
return None
return cast(AbstractBaseUser, user)
def auth_using_password(username, password, request: Optional[HttpRequest]=None) -> Optional[AbstractBaseUser]:
@ -38,17 +58,14 @@ def auth_using_password(username, password, request: Optional[HttpRequest]=None)
user = None
submitted_empty_form = (username, password) in (('string', 'string'), ('', ''), (None, None))
if submitted_empty_form:
assert request is not None, 'No request provided for API key authentication'
user = request.user # see if user is authed via django session and use that as the default
if not submitted_empty_form:
user = authenticate(
if not user:
print('[❌] Failed to authenticate API user using API Key:', request)
# print('[❌] Failed to authenticate API user using API Key:', request)
user = None
return cast(AbstractBaseUser | None, user)
@ -56,28 +73,41 @@ def auth_using_password(username, password, request: Optional[HttpRequest]=None)
### Base Auth Types
class APITokenAuthCheck:
"""The base class for authentication methods that use an api.models.APIToken"""
def authenticate(self, request: HttpRequest, key: Optional[str]=None) -> Optional[AbstractBaseUser]:
user = auth_using_token(
request.user = auth_using_token(
if user is not None:
login(request, user, backend='django.contrib.auth.backends.ModelBackend')
return user
if request.user and request.user.pk:
# Don't set cookie/persist login ouside this erquest, user may be accessing the API from another domain (CSRF/CORS):
# login(request, request.user, backend='django.contrib.auth.backends.ModelBackend')
request._api_auth_method = self.__class__.__name__
if not request.user.is_superuser:
raise HttpError(403, 'Valid API token but User does not have permission (make sure user.is_superuser=True)')
return request.user
class UserPassAuthCheck:
"""The base class for authentication methods that use a username & password"""
def authenticate(self, request: HttpRequest, username: Optional[str]=None, password: Optional[str]=None) -> Optional[AbstractBaseUser]:
user = auth_using_password(
request.user = auth_using_password(
if user is not None:
login(request, user, backend='django.contrib.auth.backends.ModelBackend')
return user
if request.user and request.user.pk:
# Don't set cookie/persist login ouside this erquest, user may be accessing the API from another domain (CSRF/CORS):
# login(request, request.user, backend='django.contrib.auth.backends.ModelBackend')
request._api_auth_method = self.__class__.__name__
if not request.user.is_superuser:
raise HttpError(403, 'Valid API token but User does not have permission (make sure user.is_superuser=True)')
return request.user
### Django-Ninja-Provided Auth Methods
@ -98,7 +128,6 @@ class UsernameAndPasswordAuth(UserPassAuthCheck, HttpBasicAuth):
"""Allow authenticating by passing username & password via HTTP Basic Authentication (not recommended)"""
### Enabled Auth Methods
@ -53,7 +53,26 @@ class NinjaAPIWithIOCapture(NinjaAPI):
response = super().create_temporal_response(request)
print('RESPONDING NOW', response)
# Diable caching of API responses entirely
response['Cache-Control'] = 'no-store'
# Add debug stdout and stderr headers to response
response['X-ArchiveBox-Stdout'] = str(request.stdout)[200:]
response['X-ArchiveBox-Stderr'] = str(request.stderr)[200:]
# response['X-ArchiveBox-View'] = self.get_openapi_operation_id(request) or 'Unknown'
# Add Auth Headers to response
api_token = getattr(request, '_api_token', None)
token_expiry = api_token.expires.isoformat() if api_token else 'Never'
response['X-ArchiveBox-Auth-Method'] = getattr(request, '_api_auth_method', None) or 'None'
response['X-ArchiveBox-Auth-Expires'] = token_expiry
response['X-ArchiveBox-Auth-Token-Id'] = api_token.abid if api_token else 'None'
response['X-ArchiveBox-Auth-User-Id'] = request.user.pk if request.user.pk else 'None'
response['X-ArchiveBox-Auth-User-Username'] = request.user.username if request.user.pk else 'None'
# import ipdb; ipdb.set_trace()
# print('RESPONDING NOW', response)
return response
@ -7,10 +7,10 @@ from django.utils import timezone
from datetime import timedelta
from api.models import APIToken
from api.auth import auth_using_token, auth_using_password
from api.auth import auth_using_token, auth_using_password, get_or_create_api_token
router = Router(tags=['Authentication'])
router = Router(tags=['Authentication'], auth=None)
class PasswordAuthSchema(Schema):
@ -28,14 +28,8 @@ def get_api_token(request, auth_data: PasswordAuthSchema):
if user and user.is_superuser:
api_tokens = APIToken.objects.filter(created_by_id=user.pk, expires__gt=timezone.now())
if api_tokens.exists():
api_token = api_tokens.last()
api_token = APIToken.objects.create(created_by_id=user.pk, expires=timezone.now() + timedelta(days=30))
assert api_token.is_valid(), f"API token is not valid {api_token.abid}"
api_token = get_or_create_api_token(user)
assert api_token is not None, "Failed to create API token"
return api_token.__json__()
return {"success": False, "errors": ["Invalid credentials"]}
@ -16,8 +16,10 @@ from ..util import ansi_to_html
from ..config import ONLY_NEW
from .auth import API_AUTH_METHODS
# router for API that exposes archivebox cli subcommands as REST endpoints
router = Router(tags=['ArchiveBox CLI Sub-Commands'])
router = Router(tags=['ArchiveBox CLI Sub-Commands'], auth=API_AUTH_METHODS)
# Schemas
@ -12,11 +12,15 @@ from django.contrib.auth import get_user_model
from ninja import Router, Schema, FilterSchema, Field, Query
from ninja.pagination import paginate, PaginationBase
from ninja.errors import HttpError
from core.models import Snapshot, ArchiveResult, Tag
from api.models import APIToken, OutboundWebhook
from abid_utils.abid import ABID
router = Router(tags=['Core Models'])
from .auth import API_AUTH_METHODS
router = Router(tags=['Core Models'], auth=API_AUTH_METHODS)
@ -421,4 +425,10 @@ def get_any(request, abid: str):
except Exception:
return response
if abid.startswith(APIToken.abid_prefix):
raise HttpError(403, 'APIToken objects are not accessible via REST API')
if abid.startswith(OutboundWebhook.abid_prefix):
raise HttpError(403, 'OutboundWebhook objects are not accessible via REST API')
raise HttpError(404, 'Object with given ABID not found')
@ -34,6 +34,7 @@ from core.models import Snapshot, ArchiveResult, Tag, SnapshotTag
from core.forms import AddLinkForm
from core.mixins import SearchResultsAdminMixin
from api.models import APIToken
from api.auth import get_or_create_api_token
from abid_utils.models import get_or_create_system_user_pk
from abid_utils.admin import ABIDModelAdmin
@ -268,37 +269,7 @@ class SnapshotActionForm(ActionForm):
# )
def get_abid_info(self, obj):
return format_html(
# URL Hash: <code style="font-size: 10px; user-select: all">{}</code><br/>
<a href="{}" style="font-size: 16px; font-family: monospace; user-select: all; border-radius: 8px; background-color: #ddf; padding: 3px 5px; border: 1px solid #aaa; margin-bottom: 8px; display: inline-block; vertical-align: top;">{}</a> <a href="{}" style="color: limegreen; font-size: 0.9em; vertical-align: 1px; font-family: monospace;">📖 API DOCS</a>
<div style="opacity: 0.8">
<small style="opacity: 0.8">.abid: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
<small style="opacity: 0.8">.abid.uuid: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
<small style="opacity: 0.8">.id: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
TS: <code style="font-size: 10px;"><b style="user-select: all">{}</b> {}</code> {}: <code style="user-select: all">{}</code><br/>
URI: <code style="font-size: 10px; "><b style="user-select: all">{}</b> {}</code> <span style="display:inline-block; vertical-align: -4px; width: 290px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;">{}: <code style="user-select: all">{}</code></span>
SALT: <code style="font-size: 10px;"><b style="display:inline-block; user-select: all; width: 50px; white-space: nowrap; overflow: hidden; text-overflow: ellipsis;">{}</b></code><br/>
SUBTYPE: <code style="font-size: 10px;"><b style="user-select: all">{}</b> {}</code> {}: <code style="user-select: all">{}</code><br/>
RAND: <code style="font-size: 10px;"><b style="user-select: all">{}</b> {}</code> {}: <code style="user-select: all">{}</code>
<small style="opacity: 0.5">.old_id: <code style="font-size: 10px; user-select: all">{}</code></small><br/>
obj.api_url, obj.api_url, obj.api_docs_url,
obj.ABID.ts, str(obj.ABID.uuid)[0:14], obj.abid_ts_src, obj.abid_values['ts'].isoformat() if isinstance(obj.abid_values['ts'], datetime) else obj.abid_values['ts'],
obj.ABID.uri, str(obj.ABID.uuid)[14:26], obj.abid_uri_src, str(obj.abid_values['uri']),
obj.ABID.subtype, str(obj.ABID.uuid)[26:28], obj.abid_subtype_src, str(obj.abid_values['subtype']),
obj.ABID.rand, str(obj.ABID.uuid)[28:36], obj.abid_rand_src, str(obj.abid_values['rand'])[-7:],
str(getattr(obj, 'old_id', '')),
@admin.register(Snapshot, site=archivebox_admin)
@ -321,6 +292,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
show_full_result_count = False
def changelist_view(self, request, extra_context=None):
self.request = request
extra_context = extra_context or {}
return super().changelist_view(request, extra_context | GLOBAL_CONTEXT)
@ -329,6 +301,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
return super().changelist_view(request, GLOBAL_CONTEXT)
def change_view(self, request, object_id, form_url="", extra_context=None):
self.request = request
snapshot = None
@ -350,6 +323,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
if snapshot:
object_id = str(snapshot.id)
return super().change_view(
@ -430,12 +404,6 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
obj.extension or '-',
def API(self, obj):
return get_abid_info(self, obj)
except Exception as e:
return str(e)
@ -603,8 +571,6 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
# actions = ['delete_selected']
# ordering = ['-id']
# def API(self, obj):
# return get_abid_info(self, obj)
@admin.register(Tag, site=archivebox_admin)
@ -619,11 +585,6 @@ class TagAdmin(ABIDModelAdmin):
paginator = AccelleratedPaginator
def API(self, obj):
return get_abid_info(self, obj)
except Exception as e:
return str(e)
def num_snapshots(self, tag):
return format_html(
@ -660,6 +621,10 @@ class ArchiveResultAdmin(ABIDModelAdmin):
paginator = AccelleratedPaginator
def change_view(self, request, object_id, form_url="", extra_context=None):
self.request = request
return super().change_view(request, object_id, form_url, extra_context)
description='Snapshot Info'
@ -672,12 +637,6 @@ class ArchiveResultAdmin(ABIDModelAdmin):
def API(self, obj):
return get_abid_info(self, obj)
except Exception as e:
raise e
return str(e)
description='Snapshot Tags'
@ -735,7 +694,7 @@ class ArchiveResultAdmin(ABIDModelAdmin):
class APITokenAdmin(ABIDModelAdmin):
list_display = ('created', 'abid', 'created_by', 'token_redacted', 'expires')
sort_fields = ('abid', 'created', 'created_by', 'expires')
readonly_fields = ('abid', 'created')
readonly_fields = ('created', 'modified', 'API')
search_fields = ('id', 'abid', 'created_by__username', 'token')
fields = ('created_by', 'token', 'expires', *readonly_fields)
@ -747,4 +706,4 @@ class APITokenAdmin(ABIDModelAdmin):
class CustomWebhookAdmin(WebhookAdmin, ABIDModelAdmin):
list_display = ('created', 'created_by', 'abid', *WebhookAdmin.list_display)
sort_fields = ('created', 'created_by', 'abid', 'referenced_model', 'endpoint', 'last_success', 'last_error')
readonly_fields = ('abid', 'created', *WebhookAdmin.readonly_fields)
readonly_fields = ('created', 'modified', 'API', *WebhookAdmin.readonly_fields)
@ -298,10 +298,11 @@ SECURE_REFERRER_POLICY = 'strict-origin-when-cross-origin'
SESSION_COOKIE_AGE = 1209600 # 2 weeks
SESSION_ENGINE = "django.contrib.sessions.backends.db"
Add table
Reference in a new issue