Add new plugin system to separate out ArchiveBox components (#1432)

This commit is contained in:
Nick Sweeting 2024-06-02 17:55:49 -07:00 committed by GitHub
commit 8eac7f09ce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 2302 additions and 4 deletions

View file

@ -16,6 +16,7 @@ from django import forms
from signal_webhooks.admin import WebhookAdmin, get_webhook_model
# from plugantic.admin import CustomPlugin
from ..util import htmldecode, urldecode, ansi_to_html
@ -37,6 +38,7 @@ from config import (
CAN_UPGRADE
)
GLOBAL_CONTEXT = {'VERSION': VERSION, 'VERSIONS_AVAILABLE': VERSIONS_AVAILABLE, 'CAN_UPGRADE': CAN_UPGRADE}
# Admin URLs
@ -109,8 +111,10 @@ archivebox_admin.register(APIToken)
archivebox_admin.register(get_webhook_model(), WebhookAdmin)
archivebox_admin.disable_action('delete_selected')
# archivebox_admin.register(CustomPlugin)
# patch admin with methods to add data views (implemented by admin_data_views package)
############### Additional sections are defined in settings.ADMIN_DATA_VIEWS #########
from admin_data_views.admin import get_app_list, admin_data_index_view, get_admin_data_urls, get_urls
archivebox_admin.get_app_list = get_app_list.__get__(archivebox_admin, ArchiveBoxAdmin)

View file

@ -55,6 +55,26 @@ APPEND_SLASH = True
DEBUG = DEBUG or ('--debug' in sys.argv)
# add plugins folders to system path, and load plugins in installed_apps
BUILTIN_PLUGINS_DIR = PACKAGE_DIR / 'plugins'
USER_PLUGINS_DIR = OUTPUT_DIR / 'plugins'
sys.path.insert(0, str(BUILTIN_PLUGINS_DIR))
sys.path.insert(0, str(USER_PLUGINS_DIR))
def find_plugins(plugins_dir):
return {
# plugin_entrypoint.parent.name: import_module(plugin_entrypoint.parent.name).METADATA
plugin_entrypoint.parent.name: plugin_entrypoint.parent
for plugin_entrypoint in plugins_dir.glob('*/apps.py')
}
INSTALLED_PLUGINS = {
**find_plugins(BUILTIN_PLUGINS_DIR),
**find_plugins(USER_PLUGINS_DIR),
}
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
@ -62,12 +82,16 @@ INSTALLED_APPS = [
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.admin',
'django_jsonform',
'signal_webhooks',
'abid_utils',
'plugantic',
'core',
'api',
*INSTALLED_PLUGINS.keys(),
'admin_data_views',
'django_extensions',
@ -441,16 +465,36 @@ SIGNAL_WEBHOOKS = {
ADMIN_DATA_VIEWS = {
"NAME": "configuration",
"NAME": "Environment",
"URLS": [
{
"route": "live/",
"route": "config/",
"view": "core.views.live_config_list_view",
"name": "live",
"name": "Configuration",
"items": {
"route": "<str:key>/",
"view": "core.views.live_config_value_view",
"name": "live_config_value",
"name": "config_val",
},
},
{
"route": "binaries/",
"view": "plugantic.views.binaries_list_view",
"name": "Binaries",
"items": {
"route": "<str:key>/",
"view": "plugantic.views.binary_detail_view",
"name": "binary",
},
},
{
"route": "plugins/",
"view": "plugantic.views.plugins_list_view",
"name": "Plugins",
"items": {
"route": "<str:key>/",
"view": "plugantic.views.plugin_detail_view",
"name": "plugin",
},
},
],

View file

@ -0,0 +1,17 @@
__package__ = 'archivebox.plugantic'
from .binproviders import BinProvider
from .binaries import Binary
from .extractors import Extractor
from .replayers import Replayer
from .configs import ConfigSet
from .plugins import Plugin
# __all__ = [
# 'BinProvider',
# 'Binary',
# 'Extractor',
# 'Replayer',
# 'ConfigSet',
# 'Plugin',
# ]

View file

@ -0,0 +1,26 @@
# from django.contrib import admin
# from django import forms
# from django_jsonform.widgets import JSONFormWidget
# from django_pydantic_field.v2.fields import PydanticSchemaField
# from .models import CustomPlugin
# class PluginForm(forms.ModelForm):
# class Meta:
# model = CustomPlugin
# fields = '__all__'
# widgets = {
# 'items': JSONFormWidget(schema=PluginSchema),
# }
# class PluginAdmin(admin.ModelAdmin):
# formfield_overrides = {
# PydanticSchemaField: {"widget": JSONFormWidget},
# }
# form = PluginForm

View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class PluganticConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'plugantic'

View file

@ -0,0 +1,323 @@
__package__ = 'archivebox.plugantic'
import sys
import inspect
import importlib
from pathlib import Path
from typing import Any, Optional, Dict, List
from typing_extensions import Self
from subprocess import run, PIPE
from pydantic_core import ValidationError
from pydantic import BaseModel, Field, model_validator, computed_field, field_validator, validate_call, field_serializer
from .binproviders import (
SemVer,
BinName,
BinProviderName,
HostBinPath,
BinProvider,
EnvProvider,
AptProvider,
BrewProvider,
PipProvider,
ProviderLookupDict,
bin_name,
bin_abspath,
path_is_script,
path_is_executable,
)
class Binary(BaseModel):
name: BinName
description: str = Field(default='')
providers_supported: List[BinProvider] = Field(default=[EnvProvider()], alias='providers')
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = Field(default={}, alias='overrides')
loaded_provider: Optional[BinProviderName] = Field(default=None, alias='provider')
loaded_abspath: Optional[HostBinPath] = Field(default=None, alias='abspath')
loaded_version: Optional[SemVer] = Field(default=None, alias='version')
# bin_filename: see below
# is_executable: see below
# is_script
# is_valid: see below
@model_validator(mode='after')
def validate(self):
self.loaded_abspath = bin_abspath(self.name) or self.name
self.description = self.description or self.name
assert self.providers_supported, f'No providers were given for package {self.name}'
# pull in any overrides from the binproviders
for provider in self.providers_supported:
overrides_by_provider = provider.get_providers_for_bin(self.name)
if overrides_by_provider:
self.provider_overrides[provider.name] = {
**overrides_by_provider,
**self.provider_overrides.get(provider.name, {}),
}
return self
@field_validator('loaded_abspath', mode='before')
def parse_abspath(cls, value: Any):
return bin_abspath(value)
@field_validator('loaded_version', mode='before')
def parse_version(cls, value: Any):
return value and SemVer(value)
@field_serializer('provider_overrides', when_used='json')
def serialize_overrides(self, provider_overrides: Dict[BinProviderName, ProviderLookupDict]) -> Dict[BinProviderName, Dict[str, str]]:
return {
provider_name: {
key: str(val)
for key, val in overrides.items()
}
for provider_name, overrides in provider_overrides.items()
}
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def bin_filename(self) -> BinName:
if self.is_script:
# e.g. '.../Python.framework/Versions/3.11/lib/python3.11/sqlite3/__init__.py' -> sqlite
name = self.name
elif self.loaded_abspath:
# e.g. '/opt/homebrew/bin/wget' -> wget
name = bin_name(self.loaded_abspath)
else:
# e.g. 'ytdlp' -> 'yt-dlp'
name = bin_name(self.name)
return name
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def is_executable(self) -> bool:
try:
assert self.loaded_abspath and path_is_executable(self.loaded_abspath)
return True
except (ValidationError, AssertionError):
return False
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def is_script(self) -> bool:
try:
assert self.loaded_abspath and path_is_script(self.loaded_abspath)
return True
except (ValidationError, AssertionError):
return False
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def is_valid(self) -> bool:
return bool(
self.name
and self.loaded_abspath
and self.loaded_version
and (self.is_executable or self.is_script)
)
@validate_call
def install(self) -> Self:
if not self.providers_supported:
return self
exc = Exception('No providers were able to install binary', self.name, self.providers_supported)
for provider in self.providers_supported:
try:
installed_bin = provider.install(self.name, overrides=self.provider_overrides.get(provider.name))
if installed_bin:
# print('INSTALLED', self.name, installed_bin)
return self.model_copy(update={
'loaded_provider': provider.name,
'loaded_abspath': installed_bin.abspath,
'loaded_version': installed_bin.version,
})
except Exception as err:
print(err)
exc = err
raise exc
@validate_call
def load(self, cache=True) -> Self:
if self.is_valid:
return self
if not self.providers_supported:
return self
exc = Exception('No providers were able to install binary', self.name, self.providers_supported)
for provider in self.providers_supported:
try:
installed_bin = provider.load(self.name, cache=cache, overrides=self.provider_overrides.get(provider.name))
if installed_bin:
# print('LOADED', provider, self.name, installed_bin)
return self.model_copy(update={
'loaded_provider': provider.name,
'loaded_abspath': installed_bin.abspath,
'loaded_version': installed_bin.version,
})
except Exception as err:
print(err)
exc = err
raise exc
@validate_call
def load_or_install(self, cache=True) -> Self:
if self.is_valid:
return self
if not self.providers_supported:
return self
exc = Exception('No providers were able to install binary', self.name, self.providers_supported)
for provider in self.providers_supported:
try:
installed_bin = provider.load_or_install(self.name, overrides=self.provider_overrides.get(provider.name), cache=cache)
if installed_bin:
# print('LOADED_OR_INSTALLED', self.name, installed_bin)
return self.model_copy(update={
'loaded_provider': provider.name,
'loaded_abspath': installed_bin.abspath,
'loaded_version': installed_bin.version,
})
except Exception as err:
print(err)
exc = err
raise exc
@validate_call
def exec(self, args=(), pwd='.'):
assert self.loaded_abspath
assert self.loaded_version
return run([self.loaded_abspath, *args], stdout=PIPE, stderr=PIPE, pwd=pwd)
class SystemPythonHelpers:
@staticmethod
def get_subdeps() -> str:
return 'python3 python3-minimal python3-pip python3-virtualenv'
@staticmethod
def get_abspath() -> str:
return sys.executable
@staticmethod
def get_version() -> str:
return '{}.{}.{}'.format(*sys.version_info[:3])
class SqliteHelpers:
@staticmethod
def get_abspath() -> Path:
import sqlite3
importlib.reload(sqlite3)
return Path(inspect.getfile(sqlite3))
@staticmethod
def get_version() -> SemVer:
import sqlite3
importlib.reload(sqlite3)
version = sqlite3.version
assert version
return SemVer(version)
class DjangoHelpers:
@staticmethod
def get_django_abspath() -> str:
import django
return inspect.getfile(django)
@staticmethod
def get_django_version() -> str:
import django
return '{}.{}.{} {} ({})'.format(*django.VERSION)
class YtdlpHelpers:
@staticmethod
def get_ytdlp_subdeps() -> str:
return 'yt-dlp ffmpeg'
@staticmethod
def get_ytdlp_version() -> str:
import yt_dlp
importlib.reload(yt_dlp)
version = yt_dlp.version.__version__
assert version
return version
class PythonBinary(Binary):
name: BinName = 'python'
providers_supported: List[BinProvider] = [
EnvProvider(
subdeps_provider={'python': 'plugantic.binaries.SystemPythonHelpers.get_subdeps'},
abspath_provider={'python': 'plugantic.binaries.SystemPythonHelpers.get_abspath'},
version_provider={'python': 'plugantic.binaries.SystemPythonHelpers.get_version'},
),
]
class SqliteBinary(Binary):
name: BinName = 'sqlite'
providers_supported: List[BinProvider] = [
EnvProvider(
version_provider={'sqlite': 'plugantic.binaries.SqliteHelpers.get_version'},
abspath_provider={'sqlite': 'plugantic.binaries.SqliteHelpers.get_abspath'},
),
]
class DjangoBinary(Binary):
name: BinName = 'django'
providers_supported: List[BinProvider] = [
EnvProvider(
abspath_provider={'django': 'plugantic.binaries.DjangoHelpers.get_django_abspath'},
version_provider={'django': 'plugantic.binaries.DjangoHelpers.get_django_version'},
),
]
class YtdlpBinary(Binary):
name: BinName = 'yt-dlp'
providers_supported: List[BinProvider] = [
# EnvProvider(),
PipProvider(version_provider={'yt-dlp': 'plugantic.binaries.YtdlpHelpers.get_ytdlp_version'}),
BrewProvider(subdeps_provider={'yt-dlp': 'plugantic.binaries.YtdlpHelpers.get_ytdlp_subdeps'}),
# AptProvider(subdeps_provider={'yt-dlp': lambda: 'yt-dlp ffmpeg'}),
]
class WgetBinary(Binary):
name: BinName = 'wget'
providers_supported: List[BinProvider] = [EnvProvider(), AptProvider()]
# if __name__ == '__main__':
# PYTHON_BINARY = PythonBinary()
# SQLITE_BINARY = SqliteBinary()
# DJANGO_BINARY = DjangoBinary()
# WGET_BINARY = WgetBinary()
# YTDLP_BINARY = YtdlpPBinary()
# print('-------------------------------------DEFINING BINARIES---------------------------------')
# print(PYTHON_BINARY)
# print(SQLITE_BINARY)
# print(DJANGO_BINARY)
# print(WGET_BINARY)
# print(YTDLP_BINARY)

View file

@ -0,0 +1,561 @@
__package__ = 'archivebox.plugantic'
import os
import shutil
import operator
from typing import Callable, Any, Optional, Type, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING
from typing_extensions import Self
from abc import ABC, abstractmethod
from collections import namedtuple
from pathlib import Path
from subprocess import run, PIPE
from pydantic_core import core_schema, ValidationError
from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, validate_call, GetCoreSchemaHandler
def func_takes_args_or_kwargs(lambda_func: Callable[..., Any]) -> bool:
"""returns True if a lambda func takes args/kwargs of any kind, otherwise false if it's pure/argless"""
code = lambda_func.__code__
has_args = code.co_argcount > 0
has_varargs = code.co_flags & 0x04 != 0
has_varkw = code.co_flags & 0x08 != 0
return has_args or has_varargs or has_varkw
def is_semver_str(semver: Any) -> bool:
if isinstance(semver, str):
return (semver.count('.') == 2 and semver.replace('.', '').isdigit())
return False
def semver_to_str(semver: tuple[int, int, int] | str) -> str:
if isinstance(semver, (list, tuple)):
return '.'.join(str(chunk) for chunk in semver)
if is_semver_str(semver):
return semver
raise ValidationError('Tried to convert invalid SemVer: {}'.format(semver))
SemVerTuple = namedtuple('SemVerTuple', ('major', 'minor', 'patch'), defaults=(0, 0, 0))
SemVerParsableTypes = str | tuple[str | int, ...] | list[str | int]
class SemVer(SemVerTuple):
major: int
minor: int = 0
patch: int = 0
if TYPE_CHECKING:
full_text: str | None = ''
def __new__(cls, *args, full_text=None, **kwargs):
# '1.1.1'
if len(args) == 1 and is_semver_str(args[0]):
result = SemVer.parse(args[0])
# ('1', '2', '3')
elif len(args) == 1 and isinstance(args[0], (tuple, list)):
result = SemVer.parse(args[0])
# (1, '2', None)
elif not all(isinstance(arg, (int, type(None))) for arg in args):
result = SemVer.parse(args)
# (None)
elif all(chunk in ('', 0, None) for chunk in (*args, *kwargs.values())):
result = None
# 1, 2, 3
else:
result = SemVerTuple.__new__(cls, *args, **kwargs)
if result is not None:
# add first line as extra hidden metadata so it can be logged without having to re-run version cmd
result.full_text = full_text or str(result)
return result
@classmethod
def parse(cls, version_stdout: SemVerParsableTypes) -> Self | None:
"""
parses a version tag string formatted like into (major, minor, patch) ints
'Google Chrome 124.0.6367.208' -> (124, 0, 6367)
'GNU Wget 1.24.5 built on darwin23.2.0.' -> (1, 24, 5)
'curl 8.4.0 (x86_64-apple-darwin23.0) ...' -> (8, 4, 0)
'2024.04.09' -> (2024, 4, 9)
"""
# print('INITIAL_VALUE', type(version_stdout).__name__, version_stdout)
if isinstance(version_stdout, (tuple, list)):
version_stdout = '.'.join(str(chunk) for chunk in version_stdout)
elif isinstance(version_stdout, bytes):
version_stdout = version_stdout.decode()
elif not isinstance(version_stdout, str):
version_stdout = str(version_stdout)
# no text to work with, return None immediately
if not version_stdout.strip():
# raise Exception('Tried to parse semver from empty version output (is binary installed and available?)')
return None
just_numbers = lambda col: col.lower().strip('v').split('+')[0].split('-')[0].split('_')[0]
contains_semver = lambda col: (
col.count('.') in (1, 2, 3)
and all(chunk.isdigit() for chunk in col.split('.')[:3]) # first 3 chunks can only be nums
)
full_text = version_stdout.split('\n')[0].strip()
first_line_columns = full_text.split()[:4]
version_columns = list(filter(contains_semver, map(just_numbers, first_line_columns)))
# could not find any column of first line that looks like a version number, despite there being some text
if not version_columns:
# raise Exception('Failed to parse semver from version command output: {}'.format(' '.join(first_line_columns)))
return None
# take first col containing a semver, and truncate it to 3 chunks (e.g. 2024.04.09.91) -> (2024, 04, 09)
first_version_tuple = version_columns[0].split('.', 3)[:3]
# print('FINAL_VALUE', first_version_tuple)
return cls(*(int(chunk) for chunk in first_version_tuple), full_text=full_text)
def __str__(self):
return '.'.join(str(chunk) for chunk in self)
# @classmethod
# def __get_pydantic_core_schema__(cls, source: Type[Any], handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
# default_schema = handler(source)
# return core_schema.no_info_after_validator_function(
# cls.parse,
# default_schema,
# serialization=core_schema.plain_serializer_function_ser_schema(
# lambda semver: str(semver),
# info_arg=False,
# return_schema=core_schema.str_schema(),
# ),
# )
assert SemVer(None) == None
assert SemVer('') == None
assert SemVer.parse('') == None
assert SemVer(1) == (1, 0, 0)
assert SemVer(1, 2) == (1, 2, 0)
assert SemVer('1.2+234234') == (1, 2, 0)
assert SemVer((1, 2, 3)) == (1, 2, 3)
assert getattr(SemVer((1, 2, 3)), 'full_text') == '1.2.3'
assert SemVer(('1', '2', '3')) == (1, 2, 3)
assert SemVer.parse('5.6.7') == (5, 6, 7)
assert SemVer.parse('124.0.6367.208') == (124, 0, 6367)
assert SemVer.parse('Google Chrome 124.1+234.234') == (124, 1, 0)
assert SemVer.parse('Google Ch1rome 124.0.6367.208') == (124, 0, 6367)
assert SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324') == (124, 0, 6367)
assert getattr(SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324'), 'full_text') == 'Google Chrome 124.0.6367.208+beta_234. 234.234.123'
assert SemVer.parse('Google Chrome') == None
@validate_call
def bin_name(bin_path_or_name: str | Path) -> str:
name = Path(bin_path_or_name).name
assert len(name) > 1
assert name.replace('-', '').replace('_', '').replace('.', '').isalnum(), (
f'Binary name can only contain a-Z0-9-_.: {name}')
return name
BinName = Annotated[str, AfterValidator(bin_name)]
@validate_call
def path_is_file(path: Path | str) -> Path:
path = Path(path) if isinstance(path, str) else path
assert path.is_file(), f'Path is not a file: {path}'
return path
HostExistsPath = Annotated[Path, AfterValidator(path_is_file)]
@validate_call
def path_is_executable(path: HostExistsPath) -> HostExistsPath:
assert os.access(path, os.X_OK), f'Path is not executable (fix by running chmod +x {path})'
return path
@validate_call
def path_is_script(path: HostExistsPath) -> HostExistsPath:
SCRIPT_EXTENSIONS = ('.py', '.js', '.sh')
assert path.suffix.lower() in SCRIPT_EXTENSIONS, 'Path is not a script (does not end in {})'.format(', '.join(SCRIPT_EXTENSIONS))
return path
HostExecutablePath = Annotated[HostExistsPath, AfterValidator(path_is_executable)]
@validate_call
def path_is_abspath(path: Path) -> Path:
return path.resolve()
HostAbsPath = Annotated[HostExistsPath, AfterValidator(path_is_abspath)]
HostBinPath = Annotated[Path, AfterValidator(path_is_abspath), AfterValidator(path_is_file)]
@validate_call
def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None:
assert bin_path_or_name
if str(bin_path_or_name).startswith('/'):
# already a path, get its absolute form
abspath = Path(bin_path_or_name).resolve()
else:
# not a path yet, get path using os.which
binpath = shutil.which(bin_path_or_name)
if not binpath:
return None
abspath = Path(binpath).resolve()
try:
return TypeAdapter(HostBinPath).validate_python(abspath)
except ValidationError:
return None
@validate_call
def bin_version(bin_path: HostBinPath, args=('--version',)) -> SemVer | None:
return SemVer(run([bin_path, *args], stdout=PIPE).stdout.strip().decode())
class InstalledBin(BaseModel):
abspath: HostBinPath
version: SemVer
def is_valid_install_string(pkgs_str: str) -> str:
"""Make sure a string is a valid install string for a package manager, e.g. 'yt-dlp ffmpeg'"""
assert pkgs_str
assert all(len(pkg) > 1 for pkg in pkgs_str.split(' '))
return pkgs_str
def is_valid_python_dotted_import(import_str: str) -> str:
assert import_str and import_str.replace('.', '').replace('_', '').isalnum()
return import_str
InstallStr = Annotated[str, AfterValidator(is_valid_install_string)]
LazyImportStr = Annotated[str, AfterValidator(is_valid_python_dotted_import)]
ProviderHandler = Callable[..., Any] | Callable[[], Any] # must take no args [], or [bin_name: str, **kwargs]
#ProviderHandlerStr = Annotated[str, AfterValidator(lambda s: s.startswith('self.'))]
ProviderHandlerRef = LazyImportStr | ProviderHandler
ProviderLookupDict = Dict[str, LazyImportStr]
ProviderType = Literal['abspath', 'version', 'subdeps', 'install']
# class Host(BaseModel):
# machine: str
# system: str
# platform: str
# in_docker: bool
# in_qemu: bool
# python: str
BinProviderName = Literal['env', 'pip', 'apt', 'brew', 'npm', 'vendor']
class BinProvider(ABC, BaseModel):
name: BinProviderName
abspath_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_abspath'}, exclude=True)
version_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_version'}, exclude=True)
subdeps_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_subdeps'}, exclude=True)
install_provider: ProviderLookupDict = Field(default={'*': 'self.on_install'}, exclude=True)
_abspath_cache: ClassVar = {}
_version_cache: ClassVar = {}
_install_cache: ClassVar = {}
# def provider_version(self) -> SemVer | None:
# """Version of the actual underlying package manager (e.g. pip v20.4.1)"""
# if self.name in ('env', 'vendor'):
# return SemVer('0.0.0')
# installer_binpath = Path(shutil.which(self.name)).resolve()
# return bin_version(installer_binpath)
# def provider_host(self) -> Host:
# """Information about the host env, archictecture, and OS needed to select & build packages"""
# p = platform.uname()
# return Host(
# machine=p.machine,
# system=p.system,
# platform=platform.platform(),
# python=sys.implementation.name,
# in_docker=os.environ.get('IN_DOCKER', '').lower() == 'true',
# in_qemu=os.environ.get('IN_QEMU', '').lower() == 'true',
# )
def get_default_providers(self):
return self.get_providers_for_bin('*')
def resolve_provider_func(self, provider_func: ProviderHandlerRef | None) -> ProviderHandler | None:
if provider_func is None:
return None
# if provider_func is a dotted path to a function on self, swap it for the actual function
if isinstance(provider_func, str) and provider_func.startswith('self.'):
provider_func = getattr(self, provider_func.split('self.', 1)[-1])
# if provider_func is a dot-formatted import string, import the function
if isinstance(provider_func, str):
from django.utils.module_loading import import_string
package_name, module_name, classname, path = provider_func.split('.', 3) # -> abc, def, ghi.jkl
# get .ghi.jkl nested attr present on module abc.def
imported_module = import_string(f'{package_name}.{module_name}.{classname}')
provider_func = operator.attrgetter(path)(imported_module)
# # abc.def.ghi.jkl -> 1, 2, 3
# for idx in range(1, len(path)):
# parent_path = '.'.join(path[:-idx]) # abc.def.ghi
# try:
# parent_module = import_string(parent_path)
# provider_func = getattr(parent_module, path[-idx])
# except AttributeError, ImportError:
# continue
assert TypeAdapter(ProviderHandler).validate_python(provider_func), (
f'{self.__class__.__name__} provider func for {bin_name} was not a function or dotted-import path: {provider_func}')
return provider_func
@validate_call
def get_providers_for_bin(self, bin_name: str) -> ProviderLookupDict:
providers_for_bin = {
'abspath': self.abspath_provider.get(bin_name),
'version': self.version_provider.get(bin_name),
'subdeps': self.subdeps_provider.get(bin_name),
'install': self.install_provider.get(bin_name),
}
only_set_providers_for_bin = {k: v for k, v in providers_for_bin.items() if v is not None}
return only_set_providers_for_bin
@validate_call
def get_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None) -> ProviderHandler:
"""
Get the provider func for a given key + Dict of provider callbacks + fallback default provider.
e.g. get_provider_for_action(bin_name='yt-dlp', 'install', default_provider=self.on_install, ...) -> Callable
"""
provider_func_ref = (
(overrides or {}).get(provider_type)
or self.get_providers_for_bin(bin_name).get(provider_type)
or self.get_default_providers().get(provider_type)
or default_provider
)
# print('getting provider for action', bin_name, provider_type, provider_func)
provider_func = self.resolve_provider_func(provider_func_ref)
assert provider_func, f'No {self.name} provider func was found for {bin_name} in: {self.__class__.__name__}.'
return provider_func
@validate_call
def call_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None, **kwargs) -> Any:
provider_func: ProviderHandler = self.get_provider_for_action(
bin_name=bin_name,
provider_type=provider_type,
default_provider=default_provider,
overrides=overrides,
)
if not func_takes_args_or_kwargs(provider_func):
# if it's a pure argless lambdas, dont pass bin_path and other **kwargs
provider_func_without_args = cast(Callable[[], Any], provider_func)
return provider_func_without_args()
provider_func = cast(Callable[..., Any], provider_func)
return provider_func(bin_name, **kwargs)
def on_get_abspath(self, bin_name: BinName, **_) -> HostBinPath | None:
print(f'[*] {self.__class__.__name__}: Getting abspath for {bin_name}...')
try:
return bin_abspath(bin_name)
except ValidationError:
return None
def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **_) -> SemVer | None:
abspath = abspath or self._abspath_cache.get(bin_name) or self.get_abspath(bin_name)
if not abspath: return None
print(f'[*] {self.__class__.__name__}: Getting version for {bin_name}...')
try:
return bin_version(abspath)
except ValidationError:
return None
def on_get_subdeps(self, bin_name: BinName, **_) -> InstallStr:
print(f'[*] {self.__class__.__name__}: Getting subdependencies for {bin_name}')
# ... subdependency calculation logic here
return TypeAdapter(InstallStr).validate_python(bin_name)
@abstractmethod
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
# ... install logic here
assert True
@validate_call
def get_abspath(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> HostBinPath | None:
abspath = self.call_provider_for_action(
bin_name=bin_name,
provider_type='abspath',
default_provider=self.on_get_abspath,
overrides=overrides,
)
if not abspath:
return None
result = TypeAdapter(HostBinPath).validate_python(abspath)
self._abspath_cache[bin_name] = result
return result
@validate_call
def get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, overrides: Optional[ProviderLookupDict]=None) -> SemVer | None:
version = self.call_provider_for_action(
bin_name=bin_name,
provider_type='version',
default_provider=self.on_get_version,
overrides=overrides,
abspath=abspath,
)
if not version:
return None
result = SemVer(version)
self._version_cache[bin_name] = result
return result
@validate_call
def get_subdeps(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstallStr:
subdeps = self.call_provider_for_action(
bin_name=bin_name,
provider_type='subdeps',
default_provider=self.on_get_subdeps,
overrides=overrides,
)
if not subdeps:
subdeps = bin_name
result = TypeAdapter(InstallStr).validate_python(subdeps)
return result
@validate_call
def install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstalledBin | None:
subdeps = self.get_subdeps(bin_name, overrides=overrides)
self.call_provider_for_action(
bin_name=bin_name,
provider_type='install',
default_provider=self.on_install,
overrides=overrides,
subdeps=subdeps,
)
installed_abspath = self.get_abspath(bin_name)
assert installed_abspath, f'Unable to find {bin_name} abspath after installing with {self.name}'
installed_version = self.get_version(bin_name, abspath=installed_abspath)
assert installed_version, f'Unable to find {bin_name} version after installing with {self.name}'
result = InstalledBin(abspath=installed_abspath, version=installed_version)
self._install_cache[bin_name] = result
return result
@validate_call
def load(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=False) -> InstalledBin | None:
installed_abspath = None
installed_version = None
if cache:
installed_bin = self._install_cache.get(bin_name)
if installed_bin:
return installed_bin
installed_abspath = self._abspath_cache.get(bin_name)
installed_version = self._version_cache.get(bin_name)
installed_abspath = installed_abspath or self.get_abspath(bin_name, overrides=overrides)
if not installed_abspath:
return None
installed_version = installed_version or self.get_version(bin_name, abspath=installed_abspath, overrides=overrides)
if not installed_version:
return None
return InstalledBin(abspath=installed_abspath, version=installed_version)
@validate_call
def load_or_install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=True) -> InstalledBin | None:
installed = self.load(bin_name, overrides=overrides, cache=cache)
if not installed:
installed = self.install(bin_name, overrides=overrides)
return installed
class PipProvider(BinProvider):
name: BinProviderName = 'pip'
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
proc = run(['pip', 'install', '--upgrade', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class AptProvider(BinProvider):
name: BinProviderName = 'apt'
subdeps_provider: ProviderLookupDict = {
'yt-dlp': lambda: 'yt-dlp ffmpeg',
}
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
run(['apt-get', 'update', '-qq'])
proc = run(['apt-get', 'install', '-y', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class BrewProvider(BinProvider):
name: BinProviderName = 'brew'
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
proc = run(['brew', 'install', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class EnvProvider(BinProvider):
name: BinProviderName = 'env'
abspath_provider: ProviderLookupDict = {
# 'python': lambda: Path('/opt/homebrew/Cellar/python@3.10/3.10.14/Frameworks/Python.framework/Versions/3.10/bin/python3.10'),
}
version_provider: ProviderLookupDict = {
# 'python': lambda: '{}.{}.{}'.format(*sys.version_info[:3]),
}
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
"""The env provider is ready-only and does not install any packages, so this is a no-op"""
pass

View file

@ -0,0 +1,53 @@
__package__ = 'archivebox.plugantic'
from typing import Optional, List, Literal
from pathlib import Path
from pydantic import BaseModel, Field
ConfigSectionName = Literal['GENERAL_CONFIG', 'ARCHIVE_METHOD_TOGGLES', 'ARCHIVE_METHOD_OPTIONS', 'DEPENDENCY_CONFIG']
class ConfigSet(BaseModel):
section: ConfigSectionName = 'GENERAL_CONFIG'
class WgetToggleConfig(ConfigSet):
section: ConfigSectionName = 'ARCHIVE_METHOD_TOGGLES'
SAVE_WGET: bool = True
SAVE_WARC: bool = True
class WgetDependencyConfig(ConfigSet):
section: ConfigSectionName = 'DEPENDENCY_CONFIG'
WGET_BINARY: str = Field(default='wget')
WGET_ARGS: Optional[List[str]] = Field(default=None)
WGET_EXTRA_ARGS: List[str] = []
WGET_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}']
class WgetOptionsConfig(ConfigSet):
section: ConfigSectionName = 'ARCHIVE_METHOD_OPTIONS'
# loaded from shared config
WGET_AUTO_COMPRESSION: bool = Field(default=True)
SAVE_WGET_REQUISITES: bool = Field(default=True)
WGET_USER_AGENT: str = Field(default='', alias='USER_AGENT')
WGET_TIMEOUT: int = Field(default=60, alias='TIMEOUT')
WGET_CHECK_SSL_VALIDITY: bool = Field(default=True, alias='CHECK_SSL_VALIDITY')
WGET_RESTRICT_FILE_NAMES: str = Field(default='windows', alias='RESTRICT_FILE_NAMES')
WGET_COOKIES_FILE: Optional[Path] = Field(default=None, alias='COOKIES_FILE')
CONFIG = {
'CHECK_SSL_VALIDITY': False,
'SAVE_WARC': False,
'TIMEOUT': 999,
}
WGET_CONFIG = [
WgetToggleConfig(**CONFIG),
WgetDependencyConfig(**CONFIG),
WgetOptionsConfig(**CONFIG),
]

View file

@ -0,0 +1,118 @@
__package__ = 'archivebox.plugantic'
from typing import Optional, List, Literal, Annotated, Dict, Any
from typing_extensions import Self
from abc import ABC
from pathlib import Path
from pydantic import BaseModel, model_validator, field_serializer, AfterValidator
from .binaries import (
Binary,
YtdlpBinary,
WgetBinary,
)
# stubs
class Snapshot:
pass
class ArchiveResult:
pass
def get_wget_output_path(*args, **kwargs) -> Path:
return Path('.').resolve()
def no_empty_args(args: List[str]) -> List[str]:
assert all(len(arg) for arg in args)
return args
ExtractorName = Literal['wget', 'warc', 'media']
HandlerFuncStr = Annotated[str, AfterValidator(lambda s: s.startswith('self.'))]
CmdArgsList = Annotated[List[str], AfterValidator(no_empty_args)]
class Extractor(ABC, BaseModel):
name: ExtractorName
binary: Binary
output_path_func: HandlerFuncStr = 'self.get_output_path'
should_extract_func: HandlerFuncStr = 'self.should_extract'
extract_func: HandlerFuncStr = 'self.extract'
exec_func: HandlerFuncStr = 'self.exec'
default_args: CmdArgsList = []
extra_args: CmdArgsList = []
args: Optional[CmdArgsList] = None
@model_validator(mode='after')
def validate_model(self) -> Self:
if self.args is None:
self.args = [*self.default_args, *self.extra_args]
return self
@field_serializer('binary', when_used='json')
def dump_binary(binary) -> str:
return binary.name
def get_output_path(self, snapshot) -> Path:
return Path(self.name)
def should_extract(self, snapshot) -> bool:
output_dir = self.get_output_path(snapshot)
if output_dir.glob('*.*'):
return False
return True
def extract(self, url: str, **kwargs) -> Dict[str, Any]:
output_dir = self.get_output_path(url, **kwargs)
cmd = [url, *self.args] if self.args is not None else [url, *self.default_args, *self.extra_args]
proc = self.exec(cmd, pwd=output_dir)
return {
'status': 'succeeded' if proc.returncode == 0 else 'failed',
'output': proc.stdout.decode().strip().split('\n')[-1],
'output_files': list(output_dir.glob('*.*')),
'stdout': proc.stdout.decode().strip(),
'stderr': proc.stderr.decode().strip(),
'returncode': proc.returncode,
}
def exec(self, args: CmdArgsList, pwd: Optional[Path]=None):
pwd = pwd or Path('.')
assert self.binary.loaded_provider
return self.binary.exec(args, pwd=pwd)
class YtdlpExtractor(Extractor):
name: ExtractorName = 'media'
binary: Binary = YtdlpBinary()
def get_output_path(self, snapshot) -> Path:
return Path(self.name)
class WgetExtractor(Extractor):
name: ExtractorName = 'wget'
binary: Binary = WgetBinary()
def get_output_path(self, snapshot) -> Path:
return get_wget_output_path(snapshot)
class WarcExtractor(Extractor):
name: ExtractorName = 'warc'
binary: Binary = WgetBinary()
def get_output_path(self, snapshot) -> Path:
return get_wget_output_path(snapshot)

View file

@ -0,0 +1,396 @@
from typing import Dict, Any, List
import configparser
import json
import ast
JSONValue = str | bool | int | None | List['JSONValue']
def load_ini_value(val: str) -> JSONValue:
"""Convert lax INI values into strict TOML-compliant (JSON) values"""
if val.lower() in ('true', 'yes', '1'):
return True
if val.lower() in ('false', 'no', '0'):
return False
if val.isdigit():
return int(val)
try:
return ast.literal_eval(val)
except Exception:
pass
try:
return json.loads(val)
except Exception as err:
pass
return val
def convert(ini_str: str) -> str:
"""Convert a string of INI config into its TOML equivalent (warning: strips comments)"""
config = configparser.ConfigParser()
config.optionxform = str # capitalize key names
config.read_string(ini_str)
# Initialize an empty dictionary to store the TOML representation
toml_dict = {}
# Iterate over each section in the INI configuration
for section in config.sections():
toml_dict[section] = {}
# Iterate over each key-value pair in the section
for key, value in config.items(section):
parsed_value = load_ini_value(value)
# Convert the parsed value to its TOML-compatible JSON representation
toml_dict[section.upper()][key.upper()] = json.dumps(parsed_value)
# Build the TOML string
toml_str = ""
for section, items in toml_dict.items():
toml_str += f"[{section}]\n"
for key, value in items.items():
toml_str += f"{key} = {value}\n"
toml_str += "\n"
return toml_str.strip()
### Basic Assertions
test_input = """
[SERVER_CONFIG]
IS_TTY=False
USE_COLOR=False
SHOW_PROGRESS=False
IN_DOCKER=False
IN_QEMU=False
PUID=501
PGID=20
OUTPUT_DIR=/opt/archivebox/data
CONFIG_FILE=/opt/archivebox/data/ArchiveBox.conf
ONLY_NEW=True
TIMEOUT=60
MEDIA_TIMEOUT=3600
OUTPUT_PERMISSIONS=644
RESTRICT_FILE_NAMES=windows
URL_DENYLIST=\.(css|js|otf|ttf|woff|woff2|gstatic\.com|googleapis\.com/css)(\?.*)?$
URL_ALLOWLIST=None
ADMIN_USERNAME=None
ADMIN_PASSWORD=None
ENFORCE_ATOMIC_WRITES=True
TAG_SEPARATOR_PATTERN=[,]
SECRET_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
BIND_ADDR=127.0.0.1:8000
ALLOWED_HOSTS=*
DEBUG=False
PUBLIC_INDEX=True
PUBLIC_SNAPSHOTS=True
PUBLIC_ADD_VIEW=False
FOOTER_INFO=Content is hosted for personal archiving purposes only. Contact server owner for any takedown requests.
SNAPSHOTS_PER_PAGE=40
CUSTOM_TEMPLATES_DIR=None
TIME_ZONE=UTC
TIMEZONE=UTC
REVERSE_PROXY_USER_HEADER=Remote-User
REVERSE_PROXY_WHITELIST=
LOGOUT_REDIRECT_URL=/
PREVIEW_ORIGINALS=True
LDAP=False
LDAP_SERVER_URI=None
LDAP_BIND_DN=None
LDAP_BIND_PASSWORD=None
LDAP_USER_BASE=None
LDAP_USER_FILTER=None
LDAP_USERNAME_ATTR=None
LDAP_FIRSTNAME_ATTR=None
LDAP_LASTNAME_ATTR=None
LDAP_EMAIL_ATTR=None
LDAP_CREATE_SUPERUSER=False
SAVE_TITLE=True
SAVE_FAVICON=True
SAVE_WGET=True
SAVE_WGET_REQUISITES=True
SAVE_SINGLEFILE=True
SAVE_READABILITY=True
SAVE_MERCURY=True
SAVE_HTMLTOTEXT=True
SAVE_PDF=True
SAVE_SCREENSHOT=True
SAVE_DOM=True
SAVE_HEADERS=True
SAVE_WARC=True
SAVE_GIT=True
SAVE_MEDIA=True
SAVE_ARCHIVE_DOT_ORG=True
RESOLUTION=1440,2000
GIT_DOMAINS=github.com,bitbucket.org,gitlab.com,gist.github.com,codeberg.org,gitea.com,git.sr.ht
CHECK_SSL_VALIDITY=True
MEDIA_MAX_SIZE=750m
USER_AGENT=None
CURL_USER_AGENT=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 ArchiveBox/0.8.0 (+https://github.com/ArchiveBox/ArchiveBox/) curl/curl 8.4.0 (x86_64-apple-darwin23.0)
WGET_USER_AGENT=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 ArchiveBox/0.8.0 (+https://github.com/ArchiveBox/ArchiveBox/) wget/GNU Wget 1.24.5
CHROME_USER_AGENT=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 ArchiveBox/0.8.0 (+https://github.com/ArchiveBox/ArchiveBox/)
COOKIES_FILE=None
CHROME_USER_DATA_DIR=None
CHROME_TIMEOUT=0
CHROME_HEADLESS=True
CHROME_SANDBOX=True
CHROME_EXTRA_ARGS=[]
YOUTUBEDL_ARGS=['--restrict-filenames', '--trim-filenames', '128', '--write-description', '--write-info-json', '--write-annotations', '--write-thumbnail', '--no-call-home', '--write-sub', '--write-auto-subs', '--convert-subs=srt', '--yes-playlist', '--continue', '--no-abort-on-error', '--ignore-errors', '--geo-bypass', '--add-metadata', '--format=(bv*+ba/b)[filesize<=750m][filesize_approx<=?750m]/(bv*+ba/b)']
YOUTUBEDL_EXTRA_ARGS=[]
WGET_ARGS=['--no-verbose', '--adjust-extension', '--convert-links', '--force-directories', '--backup-converted', '--span-hosts', '--no-parent', '-e', 'robots=off']
WGET_EXTRA_ARGS=[]
CURL_ARGS=['--silent', '--location', '--compressed']
CURL_EXTRA_ARGS=[]
GIT_ARGS=['--recursive']
SINGLEFILE_ARGS=[]
SINGLEFILE_EXTRA_ARGS=[]
MERCURY_ARGS=['--format=text']
MERCURY_EXTRA_ARGS=[]
FAVICON_PROVIDER=https://www.google.com/s2/favicons?domain={}
USE_INDEXING_BACKEND=True
USE_SEARCHING_BACKEND=True
SEARCH_BACKEND_ENGINE=ripgrep
SEARCH_BACKEND_HOST_NAME=localhost
SEARCH_BACKEND_PORT=1491
SEARCH_BACKEND_PASSWORD=SecretPassword
SEARCH_PROCESS_HTML=True
SONIC_COLLECTION=archivebox
SONIC_BUCKET=snapshots
SEARCH_BACKEND_TIMEOUT=90
FTS_SEPARATE_DATABASE=True
FTS_TOKENIZERS=porter unicode61 remove_diacritics 2
FTS_SQLITE_MAX_LENGTH=1000000000
USE_CURL=True
USE_WGET=True
USE_SINGLEFILE=True
USE_READABILITY=True
USE_MERCURY=True
USE_GIT=True
USE_CHROME=True
USE_NODE=True
USE_YOUTUBEDL=True
USE_RIPGREP=True
CURL_BINARY=curl
GIT_BINARY=git
WGET_BINARY=wget
SINGLEFILE_BINARY=single-file
READABILITY_BINARY=readability-extractor
MERCURY_BINARY=postlight-parser
YOUTUBEDL_BINARY=yt-dlp
NODE_BINARY=node
RIPGREP_BINARY=rg
CHROME_BINARY=chrome
POCKET_CONSUMER_KEY=None
USER=squash
PACKAGE_DIR=/opt/archivebox/archivebox
TEMPLATES_DIR=/opt/archivebox/archivebox/templates
ARCHIVE_DIR=/opt/archivebox/data/archive
SOURCES_DIR=/opt/archivebox/data/sources
LOGS_DIR=/opt/archivebox/data/logs
PERSONAS_DIR=/opt/archivebox/data/personas
URL_DENYLIST_PTN=re.compile('\\.(css|js|otf|ttf|woff|woff2|gstatic\\.com|googleapis\\.com/css)(\\?.*)?$', re.IGNORECASE|re.MULTILINE)
URL_ALLOWLIST_PTN=None
DIR_OUTPUT_PERMISSIONS=755
ARCHIVEBOX_BINARY=/opt/archivebox/.venv/bin/archivebox
VERSION=0.8.0
COMMIT_HASH=102e87578c6036bb0132dd1ebd17f8f05ffc880f
BUILD_TIME=2024-05-15 03:28:05 1715768885
VERSIONS_AVAILABLE=None
CAN_UPGRADE=False
PYTHON_BINARY=/opt/archivebox/.venv/bin/python3.10
PYTHON_ENCODING=UTF-8
PYTHON_VERSION=3.10.14
DJANGO_BINARY=/opt/archivebox/.venv/lib/python3.10/site-packages/django/__init__.py
DJANGO_VERSION=5.0.6 final (0)
SQLITE_BINARY=/opt/homebrew/Cellar/python@3.10/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/sqlite3/dbapi2.py
SQLITE_VERSION=2.6.0
CURL_VERSION=curl 8.4.0 (x86_64-apple-darwin23.0)
WGET_VERSION=GNU Wget 1.24.5
WGET_AUTO_COMPRESSION=True
RIPGREP_VERSION=ripgrep 14.1.0
SINGLEFILE_VERSION=None
READABILITY_VERSION=None
MERCURY_VERSION=None
GIT_VERSION=git version 2.44.0
YOUTUBEDL_VERSION=2024.04.09
CHROME_VERSION=Google Chrome 124.0.6367.207
NODE_VERSION=v21.7.3
"""
expected_output = '''[SERVER_CONFIG]
IS_TTY = false
USE_COLOR = false
SHOW_PROGRESS = false
IN_DOCKER = false
IN_QEMU = false
PUID = 501
PGID = 20
OUTPUT_DIR = "/opt/archivebox/data"
CONFIG_FILE = "/opt/archivebox/data/ArchiveBox.conf"
ONLY_NEW = true
TIMEOUT = 60
MEDIA_TIMEOUT = 3600
OUTPUT_PERMISSIONS = 644
RESTRICT_FILE_NAMES = "windows"
URL_DENYLIST = "\\\\.(css|js|otf|ttf|woff|woff2|gstatic\\\\.com|googleapis\\\\.com/css)(\\\\?.*)?$"
URL_ALLOWLIST = null
ADMIN_USERNAME = null
ADMIN_PASSWORD = null
ENFORCE_ATOMIC_WRITES = true
TAG_SEPARATOR_PATTERN = "[,]"
SECRET_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
BIND_ADDR = "127.0.0.1:8000"
ALLOWED_HOSTS = "*"
DEBUG = false
PUBLIC_INDEX = true
PUBLIC_SNAPSHOTS = true
PUBLIC_ADD_VIEW = false
FOOTER_INFO = "Content is hosted for personal archiving purposes only. Contact server owner for any takedown requests."
SNAPSHOTS_PER_PAGE = 40
CUSTOM_TEMPLATES_DIR = null
TIME_ZONE = "UTC"
TIMEZONE = "UTC"
REVERSE_PROXY_USER_HEADER = "Remote-User"
REVERSE_PROXY_WHITELIST = ""
LOGOUT_REDIRECT_URL = "/"
PREVIEW_ORIGINALS = true
LDAP = false
LDAP_SERVER_URI = null
LDAP_BIND_DN = null
LDAP_BIND_PASSWORD = null
LDAP_USER_BASE = null
LDAP_USER_FILTER = null
LDAP_USERNAME_ATTR = null
LDAP_FIRSTNAME_ATTR = null
LDAP_LASTNAME_ATTR = null
LDAP_EMAIL_ATTR = null
LDAP_CREATE_SUPERUSER = false
SAVE_TITLE = true
SAVE_FAVICON = true
SAVE_WGET = true
SAVE_WGET_REQUISITES = true
SAVE_SINGLEFILE = true
SAVE_READABILITY = true
SAVE_MERCURY = true
SAVE_HTMLTOTEXT = true
SAVE_PDF = true
SAVE_SCREENSHOT = true
SAVE_DOM = true
SAVE_HEADERS = true
SAVE_WARC = true
SAVE_GIT = true
SAVE_MEDIA = true
SAVE_ARCHIVE_DOT_ORG = true
RESOLUTION = [1440, 2000]
GIT_DOMAINS = "github.com,bitbucket.org,gitlab.com,gist.github.com,codeberg.org,gitea.com,git.sr.ht"
CHECK_SSL_VALIDITY = true
MEDIA_MAX_SIZE = "750m"
USER_AGENT = null
CURL_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 ArchiveBox/0.8.0 (+https://github.com/ArchiveBox/ArchiveBox/) curl/curl 8.4.0 (x86_64-apple-darwin23.0)"
WGET_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 ArchiveBox/0.8.0 (+https://github.com/ArchiveBox/ArchiveBox/) wget/GNU Wget 1.24.5"
CHROME_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36 ArchiveBox/0.8.0 (+https://github.com/ArchiveBox/ArchiveBox/)"
COOKIES_FILE = null
CHROME_USER_DATA_DIR = null
CHROME_TIMEOUT = false
CHROME_HEADLESS = true
CHROME_SANDBOX = true
CHROME_EXTRA_ARGS = []
YOUTUBEDL_ARGS = ["--restrict-filenames", "--trim-filenames", "128", "--write-description", "--write-info-json", "--write-annotations", "--write-thumbnail", "--no-call-home", "--write-sub", "--write-auto-subs", "--convert-subs=srt", "--yes-playlist", "--continue", "--no-abort-on-error", "--ignore-errors", "--geo-bypass", "--add-metadata", "--format=(bv*+ba/b)[filesize<=750m][filesize_approx<=?750m]/(bv*+ba/b)"]
YOUTUBEDL_EXTRA_ARGS = []
WGET_ARGS = ["--no-verbose", "--adjust-extension", "--convert-links", "--force-directories", "--backup-converted", "--span-hosts", "--no-parent", "-e", "robots=off"]
WGET_EXTRA_ARGS = []
CURL_ARGS = ["--silent", "--location", "--compressed"]
CURL_EXTRA_ARGS = []
GIT_ARGS = ["--recursive"]
SINGLEFILE_ARGS = []
SINGLEFILE_EXTRA_ARGS = []
MERCURY_ARGS = ["--format=text"]
MERCURY_EXTRA_ARGS = []
FAVICON_PROVIDER = "https://www.google.com/s2/favicons?domain={}"
USE_INDEXING_BACKEND = true
USE_SEARCHING_BACKEND = true
SEARCH_BACKEND_ENGINE = "ripgrep"
SEARCH_BACKEND_HOST_NAME = "localhost"
SEARCH_BACKEND_PORT = 1491
SEARCH_BACKEND_PASSWORD = "SecretPassword"
SEARCH_PROCESS_HTML = true
SONIC_COLLECTION = "archivebox"
SONIC_BUCKET = "snapshots"
SEARCH_BACKEND_TIMEOUT = 90
FTS_SEPARATE_DATABASE = true
FTS_TOKENIZERS = "porter unicode61 remove_diacritics 2"
FTS_SQLITE_MAX_LENGTH = 1000000000
USE_CURL = true
USE_WGET = true
USE_SINGLEFILE = true
USE_READABILITY = true
USE_MERCURY = true
USE_GIT = true
USE_CHROME = true
USE_NODE = true
USE_YOUTUBEDL = true
USE_RIPGREP = true
CURL_BINARY = "curl"
GIT_BINARY = "git"
WGET_BINARY = "wget"
SINGLEFILE_BINARY = "single-file"
READABILITY_BINARY = "readability-extractor"
MERCURY_BINARY = "postlight-parser"
YOUTUBEDL_BINARY = "yt-dlp"
NODE_BINARY = "node"
RIPGREP_BINARY = "rg"
CHROME_BINARY = "chrome"
POCKET_CONSUMER_KEY = null
USER = "squash"
PACKAGE_DIR = "/opt/archivebox/archivebox"
TEMPLATES_DIR = "/opt/archivebox/archivebox/templates"
ARCHIVE_DIR = "/opt/archivebox/data/archive"
SOURCES_DIR = "/opt/archivebox/data/sources"
LOGS_DIR = "/opt/archivebox/data/logs"
PERSONAS_DIR = "/opt/archivebox/data/personas"
URL_DENYLIST_PTN = "re.compile(\'\\\\.(css|js|otf|ttf|woff|woff2|gstatic\\\\.com|googleapis\\\\.com/css)(\\\\?.*)?$\', re.IGNORECASE|re.MULTILINE)"
URL_ALLOWLIST_PTN = null
DIR_OUTPUT_PERMISSIONS = 755
ARCHIVEBOX_BINARY = "/opt/archivebox/.venv/bin/archivebox"
VERSION = "0.8.0"
COMMIT_HASH = "102e87578c6036bb0132dd1ebd17f8f05ffc880f"
BUILD_TIME = "2024-05-15 03:28:05 1715768885"
VERSIONS_AVAILABLE = null
CAN_UPGRADE = false
PYTHON_BINARY = "/opt/archivebox/.venv/bin/python3.10"
PYTHON_ENCODING = "UTF-8"
PYTHON_VERSION = "3.10.14"
DJANGO_BINARY = "/opt/archivebox/.venv/lib/python3.10/site-packages/django/__init__.py"
DJANGO_VERSION = "5.0.6 final (0)"
SQLITE_BINARY = "/opt/homebrew/Cellar/python@3.10/3.10.14/Frameworks/Python.framework/Versions/3.10/lib/python3.10/sqlite3/dbapi2.py"
SQLITE_VERSION = "2.6.0"
CURL_VERSION = "curl 8.4.0 (x86_64-apple-darwin23.0)"
WGET_VERSION = "GNU Wget 1.24.5"
WGET_AUTO_COMPRESSION = true
RIPGREP_VERSION = "ripgrep 14.1.0"
SINGLEFILE_VERSION = null
READABILITY_VERSION = null
MERCURY_VERSION = null
GIT_VERSION = "git version 2.44.0"
YOUTUBEDL_VERSION = "2024.04.09"
CHROME_VERSION = "Google Chrome 124.0.6367.207"
NODE_VERSION = "v21.7.3"'''
first_output = convert(test_input) # make sure ini -> toml parses correctly
second_output = convert(first_output) # make sure toml -> toml parses/dumps consistently
assert first_output == second_output == expected_output # make sure parsing is indempotent
# # DEBUGGING
# import sys
# import difflib
# sys.stdout.writelines(difflib.context_diff(first_output, second_output, fromfile='first', tofile='second'))
# print(repr(second_output))

View file

@ -0,0 +1,38 @@
# Generated by Django 5.0.6 on 2024-05-18 00:16
import abid_utils.models
import archivebox.plugantic.plugins
import charidfield.fields
import django.core.serializers.json
import django.db.models.deletion
import django_pydantic_field.fields
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Plugin',
fields=[
('created', models.DateTimeField(auto_now_add=True)),
('modified', models.DateTimeField(auto_now=True)),
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('uuid', models.UUIDField(blank=True, null=True, unique=True)),
('abid', charidfield.fields.CharIDField(blank=True, db_index=True, default=None, help_text='ABID-format identifier for this entity (e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ)', max_length=30, null=True, prefix='plg_', unique=True)),
('schema', django_pydantic_field.fields.PydanticSchemaField(config=None, encoder=django.core.serializers.json.DjangoJSONEncoder, schema=archivebox.plugantic.plugins.Plugin)),
('created_by', models.ForeignKey(default=abid_utils.models.get_or_create_system_user_pk, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
options={
'abstract': False,
},
),
]

View file

@ -0,0 +1,21 @@
# Generated by Django 5.0.6 on 2024-05-18 01:16
import archivebox.plugantic.plugins
import django.core.serializers.json
import django_pydantic_field.fields
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0001_initial'),
]
operations = [
migrations.AlterField(
model_name='plugin',
name='schema',
field=django_pydantic_field.fields.PydanticSchemaField(config=None, default=None, encoder=django.core.serializers.json.DjangoJSONEncoder, schema=archivebox.plugantic.plugins.Plugin),
),
]

View file

@ -0,0 +1,21 @@
# Generated by Django 5.0.6 on 2024-05-18 01:25
import archivebox.plugantic.replayers
import django.core.serializers.json
import django_pydantic_field.fields
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0002_alter_plugin_schema'),
]
operations = [
migrations.AlterField(
model_name='plugin',
name='schema',
field=django_pydantic_field.fields.PydanticSchemaField(config=None, default={'embed_template': 'plugins/generic_replayer/templates/embed.html', 'fullpage_template': 'plugins/generic_replayer/templates/fullpage.html', 'name': 'GenericReplayer', 'row_template': 'plugins/generic_replayer/templates/row.html', 'url_pattern': '*'}, encoder=django.core.serializers.json.DjangoJSONEncoder, schema=archivebox.plugantic.replayers.Replayer),
),
]

View file

@ -0,0 +1,32 @@
# Generated by Django 5.0.6 on 2024-05-18 01:28
import archivebox.plugantic.configs
import django.core.serializers.json
import django_pydantic_field.compat.django
import django_pydantic_field.fields
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0003_alter_plugin_schema'),
]
operations = [
migrations.RemoveField(
model_name='plugin',
name='schema',
),
migrations.AddField(
model_name='plugin',
name='configs',
field=django_pydantic_field.fields.PydanticSchemaField(config=None, default=[], encoder=django.core.serializers.json.DjangoJSONEncoder, schema=django_pydantic_field.compat.django.GenericContainer(list, (archivebox.plugantic.configs.ConfigSet,))),
),
migrations.AddField(
model_name='plugin',
name='name',
field=models.CharField(default='name', max_length=64, unique=True),
preserve_default=False,
),
]

View file

@ -0,0 +1,39 @@
# Generated by Django 5.0.6 on 2024-05-18 01:42
import abid_utils.models
import charidfield.fields
import django.db.models.deletion
import pathlib
import uuid
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0004_remove_plugin_schema_plugin_configs_plugin_name'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='CustomPlugin',
fields=[
('created', models.DateTimeField(auto_now_add=True)),
('modified', models.DateTimeField(auto_now=True)),
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('uuid', models.UUIDField(blank=True, null=True, unique=True)),
('abid', charidfield.fields.CharIDField(blank=True, db_index=True, default=None, help_text='ABID-format identifier for this entity (e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ)', max_length=30, null=True, prefix='plg_', unique=True)),
('name', models.CharField(max_length=64, unique=True)),
('path', models.FilePathField(path=pathlib.PurePosixPath('/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/archivebox/plugins'))),
('created_by', models.ForeignKey(default=abid_utils.models.get_or_create_system_user_pk, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
],
options={
'abstract': False,
},
),
migrations.DeleteModel(
name='Plugin',
),
]

View file

@ -0,0 +1,19 @@
# Generated by Django 5.0.6 on 2024-05-18 01:45
import pathlib
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0005_customplugin_delete_plugin'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, path=pathlib.PurePosixPath('/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/archivebox/plugins'), recursive=True),
),
]

View file

@ -0,0 +1,19 @@
# Generated by Django 5.0.6 on 2024-05-18 01:46
import pathlib
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0006_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, path=pathlib.PurePosixPath('/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data/plugins'), recursive=True),
),
]

View file

@ -0,0 +1,19 @@
# Generated by Django 5.0.6 on 2024-05-18 01:47
import pathlib
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0007_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, path=pathlib.PurePosixPath('/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data'), recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:48
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0008_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data', recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:48
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0009_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, match='/plugins/*', path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data', recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:48
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0010_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, match='plugins/*', path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data', recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:49
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0011_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, default='example_plugin', match='plugins/*', path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data', recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:49
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0012_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, default='/plugins/example_plugin', match='plugins/*', path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data', recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:50
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0013_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, default='/plugins/example_plugin', match='*', path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data/plugins', recursive=True),
),
]

View file

@ -0,0 +1,18 @@
# Generated by Django 5.0.6 on 2024-05-18 01:51
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0014_alter_customplugin_path'),
]
operations = [
migrations.AlterField(
model_name='customplugin',
name='path',
field=models.FilePathField(allow_files=False, allow_folders=True, match='*', path='/Volumes/NVME/Users/squash/Local/Code/archiveboxes/ArchiveBox/data/plugins', recursive=True),
),
]

View file

@ -0,0 +1,16 @@
# Generated by Django 5.0.6 on 2024-05-18 01:57
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('plugantic', '0015_alter_customplugin_path'),
]
operations = [
migrations.DeleteModel(
name='CustomPlugin',
),
]

View file

@ -0,0 +1,50 @@
__package__ = 'archivebox.plugantic'
# import uuid
# from django.db import models
# from typing_extensions import Self
# from django_pydantic_field import SchemaField
# from django.conf import settings
# from abid_utils.models import ABIDModel, ABIDField
# # from .plugins import Plugin as PluginSchema, CORE_PLUGIN
# from .binproviders import BinProvider
# from .binaries import Binary
# from .configs import WgetOptionsConfig
# from .extractors import Extractor
# from .replayers import Replayer
# PLUGINS_ROOT = settings.CONFIG['OUTPUT_DIR'] / 'plugins'
# PLUGINS_ROOT.mkdir(exist_ok=True)
# class CustomPlugin(ABIDModel):
# abid_prefix = 'plg_'
# abid_ts_src = 'self.added'
# abid_uri_src = 'self.name'
# abid_subtype_src = '"09"'
# abid_rand_src = 'self.id'
# id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) # legacy pk
# uuid = models.UUIDField(blank=True, null=True, editable=True, unique=True)
# abid = ABIDField(prefix=abid_prefix)
# name = models.CharField(max_length=64, blank=False, unique=True)
# path = models.FilePathField(path=str(PLUGINS_ROOT), match='*', recursive=True, allow_folders=True, allow_files=False)
# # replayers: list[Replayer] = SchemaField()
# # binaries: list[Replayer] = SchemaField()
# # extractors: list[Replayer] = SchemaField()
# # @classmethod
# # def from_loaded_plugin(cls, plugin: PluginSchema) -> Self:
# # new_obj = cls(
# # schema=plugin,
# # )
# # return new_obj

View file

@ -0,0 +1,134 @@
__package__ = 'archivebox.plugantic'
from typing import List
from typing_extensions import Self
from pydantic import (
BaseModel,
ConfigDict,
Field,
model_validator,
validate_call,
SerializeAsAny,
)
from .binaries import (
Binary,
PythonBinary,
SqliteBinary,
DjangoBinary,
WgetBinary,
YtdlpBinary,
)
from .extractors import (
Extractor,
YtdlpExtractor,
WgetExtractor,
WarcExtractor,
)
from .replayers import (
Replayer,
GENERIC_REPLAYER,
MEDIA_REPLAYER,
)
from .configs import (
ConfigSet,
WGET_CONFIG,
)
class Plugin(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True, extra='ignore', populate_by_name=True)
name: str = Field(default='baseplugin') # e.g. media
description: str = Field(default='') # e.g. get media using yt-dlp
configs: List[SerializeAsAny[ConfigSet]] = Field(default=[])
binaries: List[SerializeAsAny[Binary]] = Field(default=[]) # e.g. [Binary(name='yt-dlp')]
extractors: List[SerializeAsAny[Extractor]] = Field(default=[])
replayers: List[SerializeAsAny[Replayer]] = Field(default=[])
@model_validator(mode='after')
def validate(self):
self.description = self.description or self.name
@validate_call
def install(self) -> Self:
new_binaries = []
for idx, binary in enumerate(self.binaries):
new_binaries.append(binary.install() or binary)
return self.model_copy(update={
'binaries': new_binaries,
})
@validate_call
def load(self, cache=True) -> Self:
new_binaries = []
for idx, binary in enumerate(self.binaries):
new_binaries.append(binary.load(cache=cache) or binary)
return self.model_copy(update={
'binaries': new_binaries,
})
@validate_call
def load_or_install(self, cache=True) -> Self:
new_binaries = []
for idx, binary in enumerate(self.binaries):
new_binaries.append(binary.load_or_install(cache=cache) or binary)
return self.model_copy(update={
'binaries': new_binaries,
})
class CorePlugin(Plugin):
name: str = 'core'
configs: List[SerializeAsAny[ConfigSet]] = []
binaries: List[SerializeAsAny[Binary]] = [PythonBinary(), SqliteBinary(), DjangoBinary()]
extractors: List[SerializeAsAny[Extractor]] = []
replayers: List[SerializeAsAny[Replayer]] = [GENERIC_REPLAYER]
class YtdlpPlugin(Plugin):
name: str = 'ytdlp'
configs: List[SerializeAsAny[ConfigSet]] = []
binaries: List[SerializeAsAny[Binary]] = [YtdlpBinary()]
extractors: List[SerializeAsAny[Extractor]] = [YtdlpExtractor()]
replayers: List[SerializeAsAny[Replayer]] = [MEDIA_REPLAYER]
class WgetPlugin(Plugin):
name: str = 'wget'
configs: List[SerializeAsAny[ConfigSet]] = [*WGET_CONFIG]
binaries: List[SerializeAsAny[Binary]] = [WgetBinary()]
extractors: List[SerializeAsAny[Extractor]] = [WgetExtractor(), WarcExtractor()]
CORE_PLUGIN = CorePlugin()
YTDLP_PLUGIN = YtdlpPlugin()
WGET_PLUGIN = WgetPlugin()
PLUGINS = [
CORE_PLUGIN,
YTDLP_PLUGIN,
WGET_PLUGIN,
]
LOADED_PLUGINS = PLUGINS
import json
for plugin in PLUGINS:
try:
json.dumps(plugin.model_json_schema(), indent=4)
# print(json.dumps(plugin.model_json_schema(), indent=4))
except Exception as err:
print(f'Failed to generate JSON schema for {plugin.name}')
raise
# print('-------------------------------------BEFORE INSTALL---------------------------------')
# for plugin in PLUGINS:
# print(plugin.model_dump_json(indent=4))
# print('-------------------------------------DURING LOAD/INSTALL---------------------------------')
# for plugin in PLUGINS:
# LOADED_PLUGINS.append(plugin.install())
# print('-------------------------------------AFTER INSTALL---------------------------------')
# for plugin in LOADED_PLUGINS:
# print(plugin.model_dump_json(indent=4))

View file

@ -0,0 +1,26 @@
__package__ = 'archivebox.plugantic'
from pydantic import BaseModel
# from .binproviders import LazyImportStr
class Replayer(BaseModel):
"""Describes how to render an ArchiveResult in several contexts"""
name: str = 'GenericReplayer'
url_pattern: str = '*'
row_template: str = 'plugins/generic_replayer/templates/row.html'
embed_template: str = 'plugins/generic_replayer/templates/embed.html'
fullpage_template: str = 'plugins/generic_replayer/templates/fullpage.html'
# row_view: LazyImportStr = 'plugins.generic_replayer.views.row_view'
# embed_view: LazyImportStr = 'plugins.generic_replayer.views.embed_view'
# fullpage_view: LazyImportStr = 'plugins.generic_replayer.views.fullpage_view'
# icon_view: LazyImportStr = 'plugins.generic_replayer.views.get_icon'
# thumbnail_view: LazyImportStr = 'plugins.generic_replayer.views.get_icon'
GENERIC_REPLAYER = Replayer(name='generic')
MEDIA_REPLAYER = Replayer(name='media')

View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View file

@ -0,0 +1,169 @@
__package__ = 'archivebox.plugantic'
from django.http import HttpRequest
from django.utils.html import format_html, mark_safe
from admin_data_views.typing import TableContext, ItemContext
from admin_data_views.utils import render_with_table_view, render_with_item_view, ItemLink
from plugantic.plugins import LOADED_PLUGINS
from django.conf import settings
@render_with_table_view
def binaries_list_view(request: HttpRequest, **kwargs) -> TableContext:
assert request.user.is_superuser, 'Must be a superuser to view configuration settings.'
rows = {
"Binary": [],
"From Plugin": [],
"Found Version": [],
"Provided By": [],
"Found Abspath": [],
"Related Configuration": [],
"Overrides": [],
"Description": [],
}
relevant_configs = {
key: val
for key, val in settings.CONFIG.items()
if '_BINARY' in key or '_VERSION' in key
}
for plugin in LOADED_PLUGINS:
for binary in plugin.binaries:
binary = binary.load_or_install()
rows['Binary'].append(ItemLink(binary.name, key=binary.name))
rows['From Plugin'].append(plugin.name)
rows['Found Version'].append(binary.loaded_version)
rows['Provided By'].append(binary.loaded_provider)
rows['Found Abspath'].append(binary.loaded_abspath)
rows['Related Configuration'].append(mark_safe(', '.join(
f'<a href="/admin/environment/config/{config_key}/">{config_key}</a>'
for config_key, config_value in relevant_configs.items()
if binary.name.lower().replace('-', '').replace('_', '').replace('ytdlp', 'youtubedl') in config_key.lower()
# or binary.name.lower().replace('-', '').replace('_', '') in str(config_value).lower()
)))
rows['Overrides'].append(str(binary.provider_overrides))
rows['Description'].append(binary.description)
return TableContext(
title="Binaries",
table=rows,
)
@render_with_item_view
def binary_detail_view(request: HttpRequest, key: str, **kwargs) -> ItemContext:
assert request.user.is_superuser, 'Must be a superuser to view configuration settings.'
binary = None
plugin = None
for loaded_plugin in LOADED_PLUGINS:
for loaded_binary in loaded_plugin.binaries:
if loaded_binary.name == key:
binary = loaded_binary
plugin = loaded_plugin
assert plugin and binary, f'Could not find a binary matching the specified name: {key}'
binary = binary.load_or_install()
return ItemContext(
slug=key,
title=key,
data=[
{
"name": binary.name,
"description": binary.description,
"fields": {
'plugin': plugin.name,
'binprovider': binary.loaded_provider,
'abspath': binary.loaded_abspath,
'version': binary.loaded_version,
'overrides': str(binary.provider_overrides),
'providers': str(binary.providers_supported),
},
"help_texts": {
# TODO
},
},
],
)
@render_with_table_view
def plugins_list_view(request: HttpRequest, **kwargs) -> TableContext:
assert request.user.is_superuser, 'Must be a superuser to view configuration settings.'
rows = {
"Name": [],
"binaries": [],
"extractors": [],
"replayers": [],
"configs": [],
"description": [],
}
for plugin in LOADED_PLUGINS:
plugin = plugin.load_or_install()
rows['Name'].append(ItemLink(plugin.name, key=plugin.name))
rows['binaries'].append(mark_safe(', '.join(
f'<a href="/admin/environment/binaries/{binary.name}/">{binary.name}</a>'
for binary in plugin.binaries
)))
rows['extractors'].append(', '.join(extractor.name for extractor in plugin.extractors))
rows['replayers'].append(', '.join(replayer.name for replayer in plugin.replayers))
rows['configs'].append(mark_safe(', '.join(
f'<a href="/admin/environment/config/{config_key}/">{config_key}</a>'
for configset in plugin.configs
for config_key in configset.__fields__.keys()
if config_key != 'section' and config_key in settings.CONFIG
)))
rows['description'].append(str(plugin.description))
return TableContext(
title="Installed plugins",
table=rows,
)
@render_with_item_view
def plugin_detail_view(request: HttpRequest, key: str, **kwargs) -> ItemContext:
assert request.user.is_superuser, 'Must be a superuser to view configuration settings.'
plugin = None
for loaded_plugin in LOADED_PLUGINS:
if loaded_plugin.name == key:
plugin = loaded_plugin
assert plugin, f'Could not find a plugin matching the specified name: {key}'
plugin = plugin.load_or_install()
return ItemContext(
slug=key,
title=key,
data=[
{
"name": plugin.name,
"description": plugin.description,
"fields": {
'configs': plugin.configs,
'binaries': plugin.binaries,
'extractors': plugin.extractors,
'replayers': plugin.replayers,
},
"help_texts": {
# TODO
},
},
],
)

View file

@ -40,6 +40,8 @@ dependencies = [
"ulid-py>=1.1.0",
"typeid-python>=0.3.0",
"django-charid-field>=0.4",
"django-pydantic-field>=0.3.9",
"django-jsonform>=2.22.0",
]
homepage = "https://github.com/ArchiveBox/ArchiveBox"
@ -158,6 +160,22 @@ plugins = ["mypy_django_plugin.main"]
[tool.django-stubs]
django_settings_module = "core.settings"
[tool.pyright]
include = ["archivebox"]
exclude = ["**/node_modules",
"**/__pycache__",
"**/migrations",
"archivebox/vendor",
]
# ignore = ["src/oldstuff"]
# defineConstant = { DEBUG = true }
reportMissingImports = true
reportMissingTypeStubs = false
pythonVersion = "3.10"
pythonPlatform = "Linux"
[project.urls]
Homepage = "https://github.com/ArchiveBox/ArchiveBox"