new plugin loading system

This commit is contained in:
Nick Sweeting 2024-08-23 02:02:34 -07:00
parent 34389e5e7c
commit 5fe3edd79a
No known key found for this signature in database
28 changed files with 450 additions and 874 deletions

View file

View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View file

@ -0,0 +1,82 @@
import sys
import inspect
from typing import List, Dict, Any, Optional
from pathlib import Path
import django
from django.apps import AppConfig
from django.core.checks import Tags, Warning, register
from django.db.backends.sqlite3.base import Database as sqlite3
from pydantic import (
Field,
SerializeAsAny,
)
from pydantic_pkgr import BinProvider, BinProviderName, ProviderLookupDict, BinName, Binary, EnvProvider, NpmProvider
from plugantic.extractors import Extractor, ExtractorName
from plugantic.plugins import Plugin
from plugantic.configs import ConfigSet, ConfigSectionName
from plugantic.replayers import Replayer
class PythonBinary(Binary):
name: BinName = 'python'
providers_supported: List[BinProvider] = [EnvProvider()]
provider_overrides: Dict[str, Any] = {
'env': {
'subdeps': \
lambda: 'python3 python3-minimal python3-pip python3-virtualenv',
'abspath': \
lambda: sys.executable,
'version': \
lambda: '{}.{}.{}'.format(*sys.version_info[:3]),
},
}
class SqliteBinary(Binary):
name: BinName = 'sqlite'
providers_supported: List[BinProvider] = [EnvProvider()]
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = {
'env': {
'abspath': \
lambda: inspect.getfile(sqlite3),
'version': \
lambda: sqlite3.version,
},
}
class DjangoBinary(Binary):
name: BinName = 'django'
providers_supported: List[BinProvider] = [EnvProvider()]
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = {
'env': {
'abspath': \
lambda: inspect.getfile(django),
'version': \
lambda: django.VERSION[:3],
},
}
class BasicReplayer(Replayer):
name: str = 'basic'
class BasePlugin(Plugin):
name: str = 'base'
configs: List[SerializeAsAny[ConfigSet]] = []
binaries: List[SerializeAsAny[Binary]] = [PythonBinary(), SqliteBinary(), DjangoBinary()]
extractors: List[SerializeAsAny[Extractor]] = []
replayers: List[SerializeAsAny[Replayer]] = [BasicReplayer()]
PLUGINS = [BasePlugin()]
class BaseConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'builtin_plugins.base'

View file

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View file

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

View file

@ -0,0 +1,94 @@
from typing import List, Optional
from pathlib import Path
from django.apps import AppConfig
from django.core.checks import Tags, Warning, register
from pydantic import (
Field,
SerializeAsAny,
)
from pydantic_pkgr import BinProvider, BinName, Binary, EnvProvider, NpmProvider
from plugantic.extractors import Extractor, ExtractorName
from plugantic.plugins import Plugin
from plugantic.configs import ConfigSet, ConfigSectionName
###################### Config ##########################
class SinglefileToggleConfig(ConfigSet):
section: ConfigSectionName = 'ARCHIVE_METHOD_TOGGLES'
SAVE_SINGLEFILE: bool = True
class SinglefileDependencyConfig(ConfigSet):
section: ConfigSectionName = 'DEPENDENCY_CONFIG'
SINGLEFILE_BINARY: str = Field(default='wget')
SINGLEFILE_ARGS: Optional[List[str]] = Field(default=None)
SINGLEFILE_EXTRA_ARGS: List[str] = []
SINGLEFILE_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}']
class SinglefileOptionsConfig(ConfigSet):
section: ConfigSectionName = 'ARCHIVE_METHOD_OPTIONS'
# loaded from shared config
SINGLEFILE_USER_AGENT: str = Field(default='', alias='USER_AGENT')
SINGLEFILE_TIMEOUT: int = Field(default=60, alias='TIMEOUT')
SINGLEFILE_CHECK_SSL_VALIDITY: bool = Field(default=True, alias='CHECK_SSL_VALIDITY')
SINGLEFILE_RESTRICT_FILE_NAMES: str = Field(default='windows', alias='RESTRICT_FILE_NAMES')
SINGLEFILE_COOKIES_FILE: Optional[Path] = Field(default=None, alias='COOKIES_FILE')
DEFAULT_CONFIG = {
'CHECK_SSL_VALIDITY': False,
'SAVE_SINGLEFILE': True,
'TIMEOUT': 120,
}
PLUGIN_CONFIG = [
SinglefileToggleConfig(**DEFAULT_CONFIG),
SinglefileDependencyConfig(**DEFAULT_CONFIG),
SinglefileOptionsConfig(**DEFAULT_CONFIG),
]
###################### Binaries ############################
class SinglefileBinary(Binary):
name: BinName = 'single-file'
providers_supported: List[BinProvider] = [EnvProvider(), NpmProvider()]
###################### Extractors ##########################
class SinglefileExtractor(Extractor):
name: ExtractorName = 'singlefile'
binary: Binary = SinglefileBinary()
def get_output_path(self, snapshot) -> Path:
return Path(snapshot.link_dir) / 'singlefile.html'
###################### Plugins #############################
class SinglefilePlugin(Plugin):
name: str = 'singlefile'
configs: List[SerializeAsAny[ConfigSet]] = [*PLUGIN_CONFIG]
binaries: List[SerializeAsAny[Binary]] = [SinglefileBinary()]
extractors: List[SerializeAsAny[Extractor]] = [SinglefileExtractor()]
PLUGINS = [SinglefilePlugin()]
###################### Django Apps #########################
class SinglefileConfig(AppConfig):
name = 'builtin_plugins.singlefile'
verbose_name = 'SingleFile'
def ready(self):
print('Loaded singlefile plugin')

View file

@ -0,0 +1,66 @@
name: singlefile
plugin_version: '0.0.1'
plugin_spec: '0.0.1'
binaries:
singlefile:
providers:
- env
- npm
commands:
- singlefile.exec
- singlefile.extract
- singlefile.should_extract
- singlefile.get_output_path
extractors:
singlefile:
binary: singlefile
test: singlefile.should_extract
extract: singlefile.extract
output_files:
- singlefile.html
configs:
ARCHIVE_METHOD_TOGGLES:
SAVE_SINGLEFILE:
type: bool
default: true
DEPENDENCY_CONFIG:
SINGLEFILE_BINARY:
type: str
default: wget
SINGLEFILE_ARGS:
type: Optional[List[str]]
default: null
SINGLEFILE_EXTRA_ARGS:
type: List[str]
default: []
SINGLEFILE_DEFAULT_ARGS:
type: List[str]
default:
- "--timeout={TIMEOUT-10}"
ARCHIVE_METHOD_OPTIONS:
SINGLEFILE_USER_AGENT:
type: str
default: ""
alias: USER_AGENT
SINGLEFILE_TIMEOUT:
type: int
default: 60
alias: TIMEOUT
SINGLEFILE_CHECK_SSL_VALIDITY:
type: bool
default: true
alias: CHECK_SSL_VALIDITY
SINGLEFILE_RESTRICT_FILE_NAMES:
type: str
default: windows
alias: RESTRICT_FILE_NAMES
SINGLEFILE_COOKIES_FILE:
type: Optional[Path]
default: null
alias: COOKIES_FILE

View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View file

@ -34,22 +34,20 @@ APPEND_SLASH = True
DEBUG = CONFIG.DEBUG or ('--debug' in sys.argv)
# add plugins folders to system path, and load plugins in installed_apps
BUILTIN_PLUGINS_DIR = CONFIG.PACKAGE_DIR / 'plugins'
USER_PLUGINS_DIR = CONFIG.OUTPUT_DIR / 'plugins'
sys.path.insert(0, str(BUILTIN_PLUGINS_DIR))
sys.path.insert(0, str(USER_PLUGINS_DIR))
BUILTIN_PLUGINS_DIR = CONFIG.PACKAGE_DIR / 'builtin_plugins'
USER_PLUGINS_DIR = CONFIG.OUTPUT_DIR / 'user_plugins'
def find_plugins(plugins_dir):
return {
# plugin_entrypoint.parent.name: import_module(plugin_entrypoint.parent.name).METADATA
plugin_entrypoint.parent.name: plugin_entrypoint.parent
def find_plugins(plugins_dir, prefix: str) -> Dict[str, Any]:
plugins = {
f'{prefix}.{plugin_entrypoint.parent.name}': plugin_entrypoint.parent
for plugin_entrypoint in plugins_dir.glob('*/apps.py')
}
# print(f'Found {prefix} plugins:\n', '\n '.join(plugins.keys()))
return plugins
INSTALLED_PLUGINS = {
**find_plugins(BUILTIN_PLUGINS_DIR),
**find_plugins(USER_PLUGINS_DIR),
**find_plugins(BUILTIN_PLUGINS_DIR, prefix='builtin_plugins'),
**find_plugins(USER_PLUGINS_DIR, prefix='user_plugins'),
}
@ -67,11 +65,11 @@ INSTALLED_APPS = [
'plugantic',
'core',
'api',
'pkgs',
*INSTALLED_PLUGINS.keys(),
'admin_data_views',
'django_extensions',
]

View file

3
archivebox/pkgs/admin.py Normal file
View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

14
archivebox/pkgs/apps.py Normal file
View file

@ -0,0 +1,14 @@
__package__ = 'archivebox.pkgs'
from django.apps import AppConfig
class PkgsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'pkgs'
def ready(self):
from .settings import LOADED_DEPENDENCIES
# print(LOADED_DEPENDENCIES)

View file

View file

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View file

@ -0,0 +1,76 @@
__package__ = 'archivebox.pkgs'
import sys
import shutil
import inspect
from pathlib import Path
import django
from django.conf import settings
from django.db.backends.sqlite3.base import Database as sqlite3
from pydantic_pkgr import Binary, BinProvider, BrewProvider, EnvProvider, SemVer
from pydantic_pkgr.binprovider import bin_abspath
env = EnvProvider()
LOADED_DEPENDENCIES = {}
for bin_key, dependency in settings.CONFIG.DEPENDENCIES.items():
# 'PYTHON_BINARY': {
# 'path': bin_path(config['PYTHON_BINARY']),
# 'version': config['PYTHON_VERSION'],
# 'hash': bin_hash(config['PYTHON_BINARY']),
# 'enabled': True,
# 'is_valid': bool(config['PYTHON_VERSION']),
# },
bin_name = settings.CONFIG[bin_key]
if bin_name.endswith('django/__init__.py'):
binary_spec = Binary(name='django', providers=[env], provider_overrides={
'env': {
'abspath': lambda: Path(inspect.getfile(django)),
'version': lambda: SemVer('{}.{}.{} {} ({})'.format(*django.VERSION)),
}
})
elif bin_name.endswith('sqlite3/dbapi2.py'):
binary_spec = Binary(name='sqlite3', providers=[env], provider_overrides={
'env': {
'abspath': lambda: Path(inspect.getfile(sqlite3)),
'version': lambda: SemVer(sqlite3.version),
}
})
elif bin_name.endswith('archivebox'):
binary_spec = Binary(name='archivebox', providers=[env], provider_overrides={
'env': {
'abspath': lambda: shutil.which(str(Path('archivebox').expanduser())),
'version': lambda: settings.CONFIG.VERSION,
}
})
else:
binary_spec = Binary(name=bin_name, providers=[env])
try:
binary = binary_spec.load()
except Exception as e:
print(f"- ❌ Binary {bin_name} failed to load with error: {e}")
continue
assert isinstance(binary.loaded_version, SemVer)
try:
assert str(binary.loaded_version) == dependency['version'], f"Expected {bin_name} version {dependency['version']}, got {binary.loaded_version}"
assert str(binary.loaded_respath) == str(bin_abspath(dependency['path']).resolve()), f"Expected {bin_name} abspath {bin_abspath(dependency['path']).resolve()}, got {binary.loaded_respath}"
assert binary.is_valid == dependency['is_valid'], f"Expected {bin_name} is_valid={dependency['is_valid']}, got {binary.is_valid}"
except Exception as e:
print(f"Assertion error for {bin_name}: {e}")
import ipdb; ipdb.set_trace()
print(f"- ✅ Binary {bin_name} loaded successfully")
LOADED_DEPENDENCIES[bin_key] = binary

3
archivebox/pkgs/tests.py Normal file
View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

3
archivebox/pkgs/views.py Normal file
View file

@ -0,0 +1,3 @@
from django.shortcuts import render
# Create your views here.

View file

@ -1,6 +1,5 @@
__package__ = 'archivebox.plugantic'
from .binproviders import BinProvider
from .binaries import Binary
from .extractors import Extractor
from .replayers import Replayer

View file

@ -1,6 +1,17 @@
import importlib
from django.apps import AppConfig
class PluganticConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'plugantic'
def ready(self) -> None:
from django.conf import settings
from .plugins import PLUGINS
for plugin_name in settings.INSTALLED_PLUGINS.keys():
lib = importlib.import_module(f'{plugin_name}.apps')
if hasattr(lib, 'PLUGINS'):
for plugin_instance in lib.PLUGINS:
PLUGINS.append(plugin_instance)

View file

@ -10,285 +10,17 @@ from typing import Any, Optional, Dict, List
from typing_extensions import Self
from subprocess import run, PIPE
from pydantic_pkgr import Binary, SemVer, BinName, BinProvider, EnvProvider, AptProvider, BrewProvider, PipProvider, BinProviderName, ProviderLookupDict
from pydantic_core import ValidationError
from pydantic import BaseModel, Field, model_validator, computed_field, field_validator, validate_call, field_serializer
from .binproviders import (
SemVer,
BinName,
BinProviderName,
HostBinPath,
BinProvider,
EnvProvider,
AptProvider,
BrewProvider,
PipProvider,
ProviderLookupDict,
bin_name,
bin_abspath,
path_is_script,
path_is_executable,
)
class Binary(BaseModel):
name: BinName
description: str = Field(default='')
providers_supported: List[BinProvider] = Field(default=[EnvProvider()], alias='providers')
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = Field(default={}, alias='overrides')
loaded_provider: Optional[BinProviderName] = Field(default=None, alias='provider')
loaded_abspath: Optional[HostBinPath] = Field(default=None, alias='abspath')
loaded_version: Optional[SemVer] = Field(default=None, alias='version')
# bin_filename: see below
# is_executable: see below
# is_script
# is_valid: see below
@model_validator(mode='after')
def validate(self):
self.loaded_abspath = bin_abspath(self.name) or self.name
self.description = self.description or self.name
assert self.providers_supported, f'No providers were given for package {self.name}'
# pull in any overrides from the binproviders
for provider in self.providers_supported:
overrides_by_provider = provider.get_providers_for_bin(self.name)
if overrides_by_provider:
self.provider_overrides[provider.name] = {
**overrides_by_provider,
**self.provider_overrides.get(provider.name, {}),
}
return self
@field_validator('loaded_abspath', mode='before')
def parse_abspath(cls, value: Any):
return bin_abspath(value)
@field_validator('loaded_version', mode='before')
def parse_version(cls, value: Any):
return value and SemVer(value)
@field_serializer('provider_overrides', when_used='json')
def serialize_overrides(self, provider_overrides: Dict[BinProviderName, ProviderLookupDict]) -> Dict[BinProviderName, Dict[str, str]]:
return {
provider_name: {
key: str(val)
for key, val in overrides.items()
}
for provider_name, overrides in provider_overrides.items()
}
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def bin_filename(self) -> BinName:
if self.is_script:
# e.g. '.../Python.framework/Versions/3.11/lib/python3.11/sqlite3/__init__.py' -> sqlite
name = self.name
elif self.loaded_abspath:
# e.g. '/opt/homebrew/bin/wget' -> wget
name = bin_name(self.loaded_abspath)
else:
# e.g. 'ytdlp' -> 'yt-dlp'
name = bin_name(self.name)
return name
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def is_executable(self) -> bool:
try:
assert self.loaded_abspath and path_is_executable(self.loaded_abspath)
return True
except (ValidationError, AssertionError):
return False
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def is_script(self) -> bool:
try:
assert self.loaded_abspath and path_is_script(self.loaded_abspath)
return True
except (ValidationError, AssertionError):
return False
@computed_field # type: ignore[misc] # see mypy issue #1362
@property
def is_valid(self) -> bool:
return bool(
self.name
and self.loaded_abspath
and self.loaded_version
and (self.is_executable or self.is_script)
)
@validate_call
def install(self) -> Self:
if not self.providers_supported:
return self
exc = Exception('No providers were able to install binary', self.name, self.providers_supported)
for provider in self.providers_supported:
try:
installed_bin = provider.install(self.name, overrides=self.provider_overrides.get(provider.name))
if installed_bin:
# print('INSTALLED', self.name, installed_bin)
return self.model_copy(update={
'loaded_provider': provider.name,
'loaded_abspath': installed_bin.abspath,
'loaded_version': installed_bin.version,
})
except Exception as err:
print(err)
exc = err
raise exc
@validate_call
def load(self, cache=True) -> Self:
if self.is_valid:
return self
if not self.providers_supported:
return self
exc = Exception('No providers were able to install binary', self.name, self.providers_supported)
for provider in self.providers_supported:
try:
installed_bin = provider.load(self.name, cache=cache, overrides=self.provider_overrides.get(provider.name))
if installed_bin:
# print('LOADED', provider, self.name, installed_bin)
return self.model_copy(update={
'loaded_provider': provider.name,
'loaded_abspath': installed_bin.abspath,
'loaded_version': installed_bin.version,
})
except Exception as err:
print(err)
exc = err
raise exc
@validate_call
def load_or_install(self, cache=True) -> Self:
if self.is_valid:
return self
if not self.providers_supported:
return self
exc = Exception('No providers were able to install binary', self.name, self.providers_supported)
for provider in self.providers_supported:
try:
installed_bin = provider.load_or_install(self.name, overrides=self.provider_overrides.get(provider.name), cache=cache)
if installed_bin:
# print('LOADED_OR_INSTALLED', self.name, installed_bin)
return self.model_copy(update={
'loaded_provider': provider.name,
'loaded_abspath': installed_bin.abspath,
'loaded_version': installed_bin.version,
})
except Exception as err:
print(err)
exc = err
raise exc
@validate_call
def exec(self, args=(), pwd='.'):
assert self.loaded_abspath
assert self.loaded_version
return run([self.loaded_abspath, *args], stdout=PIPE, stderr=PIPE, pwd=pwd)
import django
from django.db.backends.sqlite3.base import Database as sqlite3
class SystemPythonHelpers:
@staticmethod
def get_subdeps() -> str:
return 'python3 python3-minimal python3-pip python3-virtualenv'
@staticmethod
def get_abspath() -> str:
return sys.executable
@staticmethod
def get_version() -> str:
return '{}.{}.{}'.format(*sys.version_info[:3])
class SqliteHelpers:
@staticmethod
def get_abspath() -> Path:
import sqlite3
importlib.reload(sqlite3)
return Path(inspect.getfile(sqlite3))
@staticmethod
def get_version() -> SemVer:
import sqlite3
importlib.reload(sqlite3)
version = sqlite3.version
assert version
return SemVer(version)
class DjangoHelpers:
@staticmethod
def get_django_abspath() -> str:
import django
return inspect.getfile(django)
@staticmethod
def get_django_version() -> str:
import django
return '{}.{}.{} {} ({})'.format(*django.VERSION)
class YtdlpHelpers:
@staticmethod
def get_ytdlp_subdeps() -> str:
return 'yt-dlp ffmpeg'
@staticmethod
def get_ytdlp_version() -> str:
import yt_dlp
importlib.reload(yt_dlp)
version = yt_dlp.version.__version__
assert version
return version
class PythonBinary(Binary):
name: BinName = 'python'
providers_supported: List[BinProvider] = [
EnvProvider(
subdeps_provider={'python': 'plugantic.binaries.SystemPythonHelpers.get_subdeps'},
abspath_provider={'python': 'plugantic.binaries.SystemPythonHelpers.get_abspath'},
version_provider={'python': 'plugantic.binaries.SystemPythonHelpers.get_version'},
),
]
class SqliteBinary(Binary):
name: BinName = 'sqlite'
providers_supported: List[BinProvider] = [
EnvProvider(
version_provider={'sqlite': 'plugantic.binaries.SqliteHelpers.get_version'},
abspath_provider={'sqlite': 'plugantic.binaries.SqliteHelpers.get_abspath'},
),
]
class DjangoBinary(Binary):
name: BinName = 'django'
providers_supported: List[BinProvider] = [
EnvProvider(
abspath_provider={'django': 'plugantic.binaries.DjangoHelpers.get_django_abspath'},
version_provider={'django': 'plugantic.binaries.DjangoHelpers.get_django_version'},
),
]
def get_ytdlp_version() -> str:
import yt_dlp
return yt_dlp.version.__version__
@ -296,16 +28,26 @@ class DjangoBinary(Binary):
class YtdlpBinary(Binary):
name: BinName = 'yt-dlp'
providers_supported: List[BinProvider] = [
# EnvProvider(),
PipProvider(version_provider={'yt-dlp': 'plugantic.binaries.YtdlpHelpers.get_ytdlp_version'}),
BrewProvider(subdeps_provider={'yt-dlp': 'plugantic.binaries.YtdlpHelpers.get_ytdlp_subdeps'}),
# AptProvider(subdeps_provider={'yt-dlp': lambda: 'yt-dlp ffmpeg'}),
EnvProvider(),
PipProvider(),
BrewProvider(),
AptProvider(),
]
provider_overrides: Dict[BinProviderName, ProviderLookupDict] = {
'pip': {
'version': get_ytdlp_version,
},
'brew': {
'subdeps': lambda: 'yt-dlp ffmpeg',
},
'apt': {
'subdeps': lambda: 'yt-dlp ffmpeg',
}
}
class WgetBinary(Binary):
name: BinName = 'wget'
providers_supported: List[BinProvider] = [EnvProvider(), AptProvider()]
providers_supported: List[BinProvider] = [EnvProvider(), AptProvider(), BrewProvider()]
# if __name__ == '__main__':

View file

@ -1,561 +0,0 @@
__package__ = 'archivebox.plugantic'
import os
import shutil
import operator
from typing import Callable, Any, Optional, Type, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING
from typing_extensions import Self
from abc import ABC, abstractmethod
from collections import namedtuple
from pathlib import Path
from subprocess import run, PIPE
from pydantic_core import core_schema, ValidationError
from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, validate_call, GetCoreSchemaHandler
def func_takes_args_or_kwargs(lambda_func: Callable[..., Any]) -> bool:
"""returns True if a lambda func takes args/kwargs of any kind, otherwise false if it's pure/argless"""
code = lambda_func.__code__
has_args = code.co_argcount > 0
has_varargs = code.co_flags & 0x04 != 0
has_varkw = code.co_flags & 0x08 != 0
return has_args or has_varargs or has_varkw
def is_semver_str(semver: Any) -> bool:
if isinstance(semver, str):
return (semver.count('.') == 2 and semver.replace('.', '').isdigit())
return False
def semver_to_str(semver: tuple[int, int, int] | str) -> str:
if isinstance(semver, (list, tuple)):
return '.'.join(str(chunk) for chunk in semver)
if is_semver_str(semver):
return semver
raise ValidationError('Tried to convert invalid SemVer: {}'.format(semver))
SemVerTuple = namedtuple('SemVerTuple', ('major', 'minor', 'patch'), defaults=(0, 0, 0))
SemVerParsableTypes = str | tuple[str | int, ...] | list[str | int]
class SemVer(SemVerTuple):
major: int
minor: int = 0
patch: int = 0
if TYPE_CHECKING:
full_text: str | None = ''
def __new__(cls, *args, full_text=None, **kwargs):
# '1.1.1'
if len(args) == 1 and is_semver_str(args[0]):
result = SemVer.parse(args[0])
# ('1', '2', '3')
elif len(args) == 1 and isinstance(args[0], (tuple, list)):
result = SemVer.parse(args[0])
# (1, '2', None)
elif not all(isinstance(arg, (int, type(None))) for arg in args):
result = SemVer.parse(args)
# (None)
elif all(chunk in ('', 0, None) for chunk in (*args, *kwargs.values())):
result = None
# 1, 2, 3
else:
result = SemVerTuple.__new__(cls, *args, **kwargs)
if result is not None:
# add first line as extra hidden metadata so it can be logged without having to re-run version cmd
result.full_text = full_text or str(result)
return result
@classmethod
def parse(cls, version_stdout: SemVerParsableTypes) -> Self | None:
"""
parses a version tag string formatted like into (major, minor, patch) ints
'Google Chrome 124.0.6367.208' -> (124, 0, 6367)
'GNU Wget 1.24.5 built on darwin23.2.0.' -> (1, 24, 5)
'curl 8.4.0 (x86_64-apple-darwin23.0) ...' -> (8, 4, 0)
'2024.04.09' -> (2024, 4, 9)
"""
# print('INITIAL_VALUE', type(version_stdout).__name__, version_stdout)
if isinstance(version_stdout, (tuple, list)):
version_stdout = '.'.join(str(chunk) for chunk in version_stdout)
elif isinstance(version_stdout, bytes):
version_stdout = version_stdout.decode()
elif not isinstance(version_stdout, str):
version_stdout = str(version_stdout)
# no text to work with, return None immediately
if not version_stdout.strip():
# raise Exception('Tried to parse semver from empty version output (is binary installed and available?)')
return None
just_numbers = lambda col: col.lower().strip('v').split('+')[0].split('-')[0].split('_')[0]
contains_semver = lambda col: (
col.count('.') in (1, 2, 3)
and all(chunk.isdigit() for chunk in col.split('.')[:3]) # first 3 chunks can only be nums
)
full_text = version_stdout.split('\n')[0].strip()
first_line_columns = full_text.split()[:4]
version_columns = list(filter(contains_semver, map(just_numbers, first_line_columns)))
# could not find any column of first line that looks like a version number, despite there being some text
if not version_columns:
# raise Exception('Failed to parse semver from version command output: {}'.format(' '.join(first_line_columns)))
return None
# take first col containing a semver, and truncate it to 3 chunks (e.g. 2024.04.09.91) -> (2024, 04, 09)
first_version_tuple = version_columns[0].split('.', 3)[:3]
# print('FINAL_VALUE', first_version_tuple)
return cls(*(int(chunk) for chunk in first_version_tuple), full_text=full_text)
def __str__(self):
return '.'.join(str(chunk) for chunk in self)
# @classmethod
# def __get_pydantic_core_schema__(cls, source: Type[Any], handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
# default_schema = handler(source)
# return core_schema.no_info_after_validator_function(
# cls.parse,
# default_schema,
# serialization=core_schema.plain_serializer_function_ser_schema(
# lambda semver: str(semver),
# info_arg=False,
# return_schema=core_schema.str_schema(),
# ),
# )
assert SemVer(None) == None
assert SemVer('') == None
assert SemVer.parse('') == None
assert SemVer(1) == (1, 0, 0)
assert SemVer(1, 2) == (1, 2, 0)
assert SemVer('1.2+234234') == (1, 2, 0)
assert SemVer((1, 2, 3)) == (1, 2, 3)
assert getattr(SemVer((1, 2, 3)), 'full_text') == '1.2.3'
assert SemVer(('1', '2', '3')) == (1, 2, 3)
assert SemVer.parse('5.6.7') == (5, 6, 7)
assert SemVer.parse('124.0.6367.208') == (124, 0, 6367)
assert SemVer.parse('Google Chrome 124.1+234.234') == (124, 1, 0)
assert SemVer.parse('Google Ch1rome 124.0.6367.208') == (124, 0, 6367)
assert SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324') == (124, 0, 6367)
assert getattr(SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324'), 'full_text') == 'Google Chrome 124.0.6367.208+beta_234. 234.234.123'
assert SemVer.parse('Google Chrome') == None
@validate_call
def bin_name(bin_path_or_name: str | Path) -> str:
name = Path(bin_path_or_name).name
assert len(name) > 1
assert name.replace('-', '').replace('_', '').replace('.', '').isalnum(), (
f'Binary name can only contain a-Z0-9-_.: {name}')
return name
BinName = Annotated[str, AfterValidator(bin_name)]
@validate_call
def path_is_file(path: Path | str) -> Path:
path = Path(path) if isinstance(path, str) else path
assert path.is_file(), f'Path is not a file: {path}'
return path
HostExistsPath = Annotated[Path, AfterValidator(path_is_file)]
@validate_call
def path_is_executable(path: HostExistsPath) -> HostExistsPath:
assert os.access(path, os.X_OK), f'Path is not executable (fix by running chmod +x {path})'
return path
@validate_call
def path_is_script(path: HostExistsPath) -> HostExistsPath:
SCRIPT_EXTENSIONS = ('.py', '.js', '.sh')
assert path.suffix.lower() in SCRIPT_EXTENSIONS, 'Path is not a script (does not end in {})'.format(', '.join(SCRIPT_EXTENSIONS))
return path
HostExecutablePath = Annotated[HostExistsPath, AfterValidator(path_is_executable)]
@validate_call
def path_is_abspath(path: Path) -> Path:
return path.resolve()
HostAbsPath = Annotated[HostExistsPath, AfterValidator(path_is_abspath)]
HostBinPath = Annotated[Path, AfterValidator(path_is_abspath), AfterValidator(path_is_file)]
@validate_call
def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None:
assert bin_path_or_name
if str(bin_path_or_name).startswith('/'):
# already a path, get its absolute form
abspath = Path(bin_path_or_name).resolve()
else:
# not a path yet, get path using os.which
binpath = shutil.which(bin_path_or_name)
if not binpath:
return None
abspath = Path(binpath).resolve()
try:
return TypeAdapter(HostBinPath).validate_python(abspath)
except ValidationError:
return None
@validate_call
def bin_version(bin_path: HostBinPath, args=('--version',)) -> SemVer | None:
return SemVer(run([bin_path, *args], stdout=PIPE).stdout.strip().decode())
class InstalledBin(BaseModel):
abspath: HostBinPath
version: SemVer
def is_valid_install_string(pkgs_str: str) -> str:
"""Make sure a string is a valid install string for a package manager, e.g. 'yt-dlp ffmpeg'"""
assert pkgs_str
assert all(len(pkg) > 1 for pkg in pkgs_str.split(' '))
return pkgs_str
def is_valid_python_dotted_import(import_str: str) -> str:
assert import_str and import_str.replace('.', '').replace('_', '').isalnum()
return import_str
InstallStr = Annotated[str, AfterValidator(is_valid_install_string)]
LazyImportStr = Annotated[str, AfterValidator(is_valid_python_dotted_import)]
ProviderHandler = Callable[..., Any] | Callable[[], Any] # must take no args [], or [bin_name: str, **kwargs]
#ProviderHandlerStr = Annotated[str, AfterValidator(lambda s: s.startswith('self.'))]
ProviderHandlerRef = LazyImportStr | ProviderHandler
ProviderLookupDict = Dict[str, LazyImportStr]
ProviderType = Literal['abspath', 'version', 'subdeps', 'install']
# class Host(BaseModel):
# machine: str
# system: str
# platform: str
# in_docker: bool
# in_qemu: bool
# python: str
BinProviderName = Literal['env', 'pip', 'apt', 'brew', 'npm', 'vendor']
class BinProvider(ABC, BaseModel):
name: BinProviderName
abspath_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_abspath'}, exclude=True)
version_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_version'}, exclude=True)
subdeps_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_subdeps'}, exclude=True)
install_provider: ProviderLookupDict = Field(default={'*': 'self.on_install'}, exclude=True)
_abspath_cache: ClassVar = {}
_version_cache: ClassVar = {}
_install_cache: ClassVar = {}
# def provider_version(self) -> SemVer | None:
# """Version of the actual underlying package manager (e.g. pip v20.4.1)"""
# if self.name in ('env', 'vendor'):
# return SemVer('0.0.0')
# installer_binpath = Path(shutil.which(self.name)).resolve()
# return bin_version(installer_binpath)
# def provider_host(self) -> Host:
# """Information about the host env, archictecture, and OS needed to select & build packages"""
# p = platform.uname()
# return Host(
# machine=p.machine,
# system=p.system,
# platform=platform.platform(),
# python=sys.implementation.name,
# in_docker=os.environ.get('IN_DOCKER', '').lower() == 'true',
# in_qemu=os.environ.get('IN_QEMU', '').lower() == 'true',
# )
def get_default_providers(self):
return self.get_providers_for_bin('*')
def resolve_provider_func(self, provider_func: ProviderHandlerRef | None) -> ProviderHandler | None:
if provider_func is None:
return None
# if provider_func is a dotted path to a function on self, swap it for the actual function
if isinstance(provider_func, str) and provider_func.startswith('self.'):
provider_func = getattr(self, provider_func.split('self.', 1)[-1])
# if provider_func is a dot-formatted import string, import the function
if isinstance(provider_func, str):
from django.utils.module_loading import import_string
package_name, module_name, classname, path = provider_func.split('.', 3) # -> abc, def, ghi.jkl
# get .ghi.jkl nested attr present on module abc.def
imported_module = import_string(f'{package_name}.{module_name}.{classname}')
provider_func = operator.attrgetter(path)(imported_module)
# # abc.def.ghi.jkl -> 1, 2, 3
# for idx in range(1, len(path)):
# parent_path = '.'.join(path[:-idx]) # abc.def.ghi
# try:
# parent_module = import_string(parent_path)
# provider_func = getattr(parent_module, path[-idx])
# except AttributeError, ImportError:
# continue
assert TypeAdapter(ProviderHandler).validate_python(provider_func), (
f'{self.__class__.__name__} provider func for {bin_name} was not a function or dotted-import path: {provider_func}')
return provider_func
@validate_call
def get_providers_for_bin(self, bin_name: str) -> ProviderLookupDict:
providers_for_bin = {
'abspath': self.abspath_provider.get(bin_name),
'version': self.version_provider.get(bin_name),
'subdeps': self.subdeps_provider.get(bin_name),
'install': self.install_provider.get(bin_name),
}
only_set_providers_for_bin = {k: v for k, v in providers_for_bin.items() if v is not None}
return only_set_providers_for_bin
@validate_call
def get_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None) -> ProviderHandler:
"""
Get the provider func for a given key + Dict of provider callbacks + fallback default provider.
e.g. get_provider_for_action(bin_name='yt-dlp', 'install', default_provider=self.on_install, ...) -> Callable
"""
provider_func_ref = (
(overrides or {}).get(provider_type)
or self.get_providers_for_bin(bin_name).get(provider_type)
or self.get_default_providers().get(provider_type)
or default_provider
)
# print('getting provider for action', bin_name, provider_type, provider_func)
provider_func = self.resolve_provider_func(provider_func_ref)
assert provider_func, f'No {self.name} provider func was found for {bin_name} in: {self.__class__.__name__}.'
return provider_func
@validate_call
def call_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None, **kwargs) -> Any:
provider_func: ProviderHandler = self.get_provider_for_action(
bin_name=bin_name,
provider_type=provider_type,
default_provider=default_provider,
overrides=overrides,
)
if not func_takes_args_or_kwargs(provider_func):
# if it's a pure argless lambdas, dont pass bin_path and other **kwargs
provider_func_without_args = cast(Callable[[], Any], provider_func)
return provider_func_without_args()
provider_func = cast(Callable[..., Any], provider_func)
return provider_func(bin_name, **kwargs)
def on_get_abspath(self, bin_name: BinName, **_) -> HostBinPath | None:
print(f'[*] {self.__class__.__name__}: Getting abspath for {bin_name}...')
try:
return bin_abspath(bin_name)
except ValidationError:
return None
def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **_) -> SemVer | None:
abspath = abspath or self._abspath_cache.get(bin_name) or self.get_abspath(bin_name)
if not abspath: return None
print(f'[*] {self.__class__.__name__}: Getting version for {bin_name}...')
try:
return bin_version(abspath)
except ValidationError:
return None
def on_get_subdeps(self, bin_name: BinName, **_) -> InstallStr:
print(f'[*] {self.__class__.__name__}: Getting subdependencies for {bin_name}')
# ... subdependency calculation logic here
return TypeAdapter(InstallStr).validate_python(bin_name)
@abstractmethod
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
# ... install logic here
assert True
@validate_call
def get_abspath(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> HostBinPath | None:
abspath = self.call_provider_for_action(
bin_name=bin_name,
provider_type='abspath',
default_provider=self.on_get_abspath,
overrides=overrides,
)
if not abspath:
return None
result = TypeAdapter(HostBinPath).validate_python(abspath)
self._abspath_cache[bin_name] = result
return result
@validate_call
def get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, overrides: Optional[ProviderLookupDict]=None) -> SemVer | None:
version = self.call_provider_for_action(
bin_name=bin_name,
provider_type='version',
default_provider=self.on_get_version,
overrides=overrides,
abspath=abspath,
)
if not version:
return None
result = SemVer(version)
self._version_cache[bin_name] = result
return result
@validate_call
def get_subdeps(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstallStr:
subdeps = self.call_provider_for_action(
bin_name=bin_name,
provider_type='subdeps',
default_provider=self.on_get_subdeps,
overrides=overrides,
)
if not subdeps:
subdeps = bin_name
result = TypeAdapter(InstallStr).validate_python(subdeps)
return result
@validate_call
def install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstalledBin | None:
subdeps = self.get_subdeps(bin_name, overrides=overrides)
self.call_provider_for_action(
bin_name=bin_name,
provider_type='install',
default_provider=self.on_install,
overrides=overrides,
subdeps=subdeps,
)
installed_abspath = self.get_abspath(bin_name)
assert installed_abspath, f'Unable to find {bin_name} abspath after installing with {self.name}'
installed_version = self.get_version(bin_name, abspath=installed_abspath)
assert installed_version, f'Unable to find {bin_name} version after installing with {self.name}'
result = InstalledBin(abspath=installed_abspath, version=installed_version)
self._install_cache[bin_name] = result
return result
@validate_call
def load(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=False) -> InstalledBin | None:
installed_abspath = None
installed_version = None
if cache:
installed_bin = self._install_cache.get(bin_name)
if installed_bin:
return installed_bin
installed_abspath = self._abspath_cache.get(bin_name)
installed_version = self._version_cache.get(bin_name)
installed_abspath = installed_abspath or self.get_abspath(bin_name, overrides=overrides)
if not installed_abspath:
return None
installed_version = installed_version or self.get_version(bin_name, abspath=installed_abspath, overrides=overrides)
if not installed_version:
return None
return InstalledBin(abspath=installed_abspath, version=installed_version)
@validate_call
def load_or_install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=True) -> InstalledBin | None:
installed = self.load(bin_name, overrides=overrides, cache=cache)
if not installed:
installed = self.install(bin_name, overrides=overrides)
return installed
class PipProvider(BinProvider):
name: BinProviderName = 'pip'
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
proc = run(['pip', 'install', '--upgrade', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class AptProvider(BinProvider):
name: BinProviderName = 'apt'
subdeps_provider: ProviderLookupDict = {
'yt-dlp': lambda: 'yt-dlp ffmpeg',
}
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
run(['apt-get', 'update', '-qq'])
proc = run(['apt-get', 'install', '-y', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class BrewProvider(BinProvider):
name: BinProviderName = 'brew'
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
proc = run(['brew', 'install', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class EnvProvider(BinProvider):
name: BinProviderName = 'env'
abspath_provider: ProviderLookupDict = {
# 'python': lambda: Path('/opt/homebrew/Cellar/python@3.10/3.10.14/Frameworks/Python.framework/Versions/3.10/bin/python3.10'),
}
version_provider: ProviderLookupDict = {
# 'python': lambda: '{}.{}.{}'.format(*sys.version_info[:3]),
}
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
"""The env provider is ready-only and does not install any packages, so this is a no-op"""
pass

View file

@ -14,9 +14,6 @@ from pydantic import (
from .binaries import (
Binary,
PythonBinary,
SqliteBinary,
DjangoBinary,
WgetBinary,
YtdlpBinary,
)
@ -28,7 +25,6 @@ from .extractors import (
)
from .replayers import (
Replayer,
GENERIC_REPLAYER,
MEDIA_REPLAYER,
)
from .configs import (
@ -80,12 +76,6 @@ class Plugin(BaseModel):
})
class CorePlugin(Plugin):
name: str = 'core'
configs: List[SerializeAsAny[ConfigSet]] = []
binaries: List[SerializeAsAny[Binary]] = [PythonBinary(), SqliteBinary(), DjangoBinary()]
extractors: List[SerializeAsAny[Extractor]] = []
replayers: List[SerializeAsAny[Replayer]] = [GENERIC_REPLAYER]
class YtdlpPlugin(Plugin):
name: str = 'ytdlp'
@ -101,11 +91,9 @@ class WgetPlugin(Plugin):
extractors: List[SerializeAsAny[Extractor]] = [WgetExtractor(), WarcExtractor()]
CORE_PLUGIN = CorePlugin()
YTDLP_PLUGIN = YtdlpPlugin()
WGET_PLUGIN = WgetPlugin()
PLUGINS = [
CORE_PLUGIN,
YTDLP_PLUGIN,
WGET_PLUGIN,
]

View file

@ -22,5 +22,4 @@ class Replayer(BaseModel):
# thumbnail_view: LazyImportStr = 'plugins.generic_replayer.views.get_icon'
GENERIC_REPLAYER = Replayer(name='generic')
MEDIA_REPLAYER = Replayer(name='media')

View file

@ -1,5 +1,8 @@
__package__ = 'archivebox.plugantic'
import inspect
from typing import Any
from django.http import HttpRequest
from django.utils.html import format_html, mark_safe
@ -10,6 +13,44 @@ from admin_data_views.utils import render_with_table_view, render_with_item_view
from plugantic.plugins import LOADED_PLUGINS
from django.conf import settings
def obj_to_yaml(obj: Any, indent: int=0) -> str:
indent_str = " " * indent
if isinstance(obj, dict):
if not obj:
return "{}"
result = "\n"
for key, value in obj.items():
result += f"{indent_str}{key}:{obj_to_yaml(value, indent + 1)}\n"
return result
elif isinstance(obj, list):
if not obj:
return "[]"
result = "\n"
for item in obj:
result += f"{indent_str}- {obj_to_yaml(item, indent + 1).lstrip()}\n"
return result.rstrip()
elif isinstance(obj, str):
if "\n" in obj:
return f" |\n{indent_str} " + obj.replace("\n", f"\n{indent_str} ")
else:
return f" {obj}"
elif isinstance(obj, (int, float, bool)):
return f" {str(obj)}"
elif callable(obj):
source = '\n'.join(
'' if 'def ' in line else line
for line in inspect.getsource(obj).split('\n')
if line.strip()
).split('lambda: ')[-1].rstrip(',')
return f" {indent_str} " + source.replace("\n", f"\n{indent_str} ")
else:
return f" {str(obj)}"
@render_with_table_view
def binaries_list_view(request: HttpRequest, **kwargs) -> TableContext:
@ -18,13 +59,13 @@ def binaries_list_view(request: HttpRequest, **kwargs) -> TableContext:
rows = {
"Binary": [],
"From Plugin": [],
"Found Version": [],
"From Plugin": [],
"Provided By": [],
"Found Abspath": [],
"Related Configuration": [],
"Overrides": [],
"Description": [],
# "Description": [],
}
relevant_configs = {
@ -38,8 +79,8 @@ def binaries_list_view(request: HttpRequest, **kwargs) -> TableContext:
binary = binary.load_or_install()
rows['Binary'].append(ItemLink(binary.name, key=binary.name))
rows['From Plugin'].append(plugin.name)
rows['Found Version'].append(binary.loaded_version)
rows['From Plugin'].append(plugin.name)
rows['Provided By'].append(binary.loaded_provider)
rows['Found Abspath'].append(binary.loaded_abspath)
rows['Related Configuration'].append(mark_safe(', '.join(
@ -48,8 +89,8 @@ def binaries_list_view(request: HttpRequest, **kwargs) -> TableContext:
if binary.name.lower().replace('-', '').replace('_', '').replace('ytdlp', 'youtubedl') in config_key.lower()
# or binary.name.lower().replace('-', '').replace('_', '') in str(config_value).lower()
)))
rows['Overrides'].append(str(binary.provider_overrides))
rows['Description'].append(binary.description)
rows['Overrides'].append(obj_to_yaml(binary.provider_overrides))
# rows['Description'].append(binary.description)
return TableContext(
title="Binaries",
@ -85,8 +126,8 @@ def binary_detail_view(request: HttpRequest, key: str, **kwargs) -> ItemContext:
'binprovider': binary.loaded_provider,
'abspath': binary.loaded_abspath,
'version': binary.loaded_version,
'overrides': str(binary.provider_overrides),
'providers': str(binary.providers_supported),
'overrides': obj_to_yaml(binary.provider_overrides),
'providers': obj_to_yaml(binary.providers_supported),
},
"help_texts": {
# TODO