ArchiveBox/archivebox/plugantic/binproviders.py
2024-05-17 20:13:54 -07:00

561 lines
22 KiB
Python

__package__ = 'archivebox.plugantic'
import os
import shutil
import operator
from typing import Callable, Any, Optional, Type, Dict, Annotated, ClassVar, Literal, cast, TYPE_CHECKING
from typing_extensions import Self
from abc import ABC, abstractmethod
from collections import namedtuple
from pathlib import Path
from subprocess import run, PIPE
from pydantic_core import core_schema, ValidationError
from pydantic import BaseModel, Field, TypeAdapter, AfterValidator, validate_call, GetCoreSchemaHandler
def func_takes_args_or_kwargs(lambda_func: Callable[..., Any]) -> bool:
"""returns True if a lambda func takes args/kwargs of any kind, otherwise false if it's pure/argless"""
code = lambda_func.__code__
has_args = code.co_argcount > 0
has_varargs = code.co_flags & 0x04 != 0
has_varkw = code.co_flags & 0x08 != 0
return has_args or has_varargs or has_varkw
def is_semver_str(semver: Any) -> bool:
if isinstance(semver, str):
return (semver.count('.') == 2 and semver.replace('.', '').isdigit())
return False
def semver_to_str(semver: tuple[int, int, int] | str) -> str:
if isinstance(semver, (list, tuple)):
return '.'.join(str(chunk) for chunk in semver)
if is_semver_str(semver):
return semver
raise ValidationError('Tried to convert invalid SemVer: {}'.format(semver))
SemVerTuple = namedtuple('SemVerTuple', ('major', 'minor', 'patch'), defaults=(0, 0, 0))
SemVerParsableTypes = str | tuple[str | int, ...] | list[str | int]
class SemVer(SemVerTuple):
major: int
minor: int = 0
patch: int = 0
if TYPE_CHECKING:
full_text: str | None = ''
def __new__(cls, *args, full_text=None, **kwargs):
# '1.1.1'
if len(args) == 1 and is_semver_str(args[0]):
result = SemVer.parse(args[0])
# ('1', '2', '3')
elif len(args) == 1 and isinstance(args[0], (tuple, list)):
result = SemVer.parse(args[0])
# (1, '2', None)
elif not all(isinstance(arg, (int, type(None))) for arg in args):
result = SemVer.parse(args)
# (None)
elif all(chunk in ('', 0, None) for chunk in (*args, *kwargs.values())):
result = None
# 1, 2, 3
else:
result = SemVerTuple.__new__(cls, *args, **kwargs)
if result is not None:
# add first line as extra hidden metadata so it can be logged without having to re-run version cmd
result.full_text = full_text or str(result)
return result
@classmethod
def parse(cls, version_stdout: SemVerParsableTypes) -> Self | None:
"""
parses a version tag string formatted like into (major, minor, patch) ints
'Google Chrome 124.0.6367.208' -> (124, 0, 6367)
'GNU Wget 1.24.5 built on darwin23.2.0.' -> (1, 24, 5)
'curl 8.4.0 (x86_64-apple-darwin23.0) ...' -> (8, 4, 0)
'2024.04.09' -> (2024, 4, 9)
"""
# print('INITIAL_VALUE', type(version_stdout).__name__, version_stdout)
if isinstance(version_stdout, (tuple, list)):
version_stdout = '.'.join(str(chunk) for chunk in version_stdout)
elif isinstance(version_stdout, bytes):
version_stdout = version_stdout.decode()
elif not isinstance(version_stdout, str):
version_stdout = str(version_stdout)
# no text to work with, return None immediately
if not version_stdout.strip():
# raise Exception('Tried to parse semver from empty version output (is binary installed and available?)')
return None
just_numbers = lambda col: col.lower().strip('v').split('+')[0].split('-')[0].split('_')[0]
contains_semver = lambda col: (
col.count('.') in (1, 2, 3)
and all(chunk.isdigit() for chunk in col.split('.')[:3]) # first 3 chunks can only be nums
)
full_text = version_stdout.split('\n')[0].strip()
first_line_columns = full_text.split()[:4]
version_columns = list(filter(contains_semver, map(just_numbers, first_line_columns)))
# could not find any column of first line that looks like a version number, despite there being some text
if not version_columns:
# raise Exception('Failed to parse semver from version command output: {}'.format(' '.join(first_line_columns)))
return None
# take first col containing a semver, and truncate it to 3 chunks (e.g. 2024.04.09.91) -> (2024, 04, 09)
first_version_tuple = version_columns[0].split('.', 3)[:3]
# print('FINAL_VALUE', first_version_tuple)
return cls(*(int(chunk) for chunk in first_version_tuple), full_text=full_text)
def __str__(self):
return '.'.join(str(chunk) for chunk in self)
# @classmethod
# def __get_pydantic_core_schema__(cls, source: Type[Any], handler: GetCoreSchemaHandler) -> core_schema.CoreSchema:
# default_schema = handler(source)
# return core_schema.no_info_after_validator_function(
# cls.parse,
# default_schema,
# serialization=core_schema.plain_serializer_function_ser_schema(
# lambda semver: str(semver),
# info_arg=False,
# return_schema=core_schema.str_schema(),
# ),
# )
assert SemVer(None) == None
assert SemVer('') == None
assert SemVer.parse('') == None
assert SemVer(1) == (1, 0, 0)
assert SemVer(1, 2) == (1, 2, 0)
assert SemVer('1.2+234234') == (1, 2, 0)
assert SemVer((1, 2, 3)) == (1, 2, 3)
assert getattr(SemVer((1, 2, 3)), 'full_text') == '1.2.3'
assert SemVer(('1', '2', '3')) == (1, 2, 3)
assert SemVer.parse('5.6.7') == (5, 6, 7)
assert SemVer.parse('124.0.6367.208') == (124, 0, 6367)
assert SemVer.parse('Google Chrome 124.1+234.234') == (124, 1, 0)
assert SemVer.parse('Google Ch1rome 124.0.6367.208') == (124, 0, 6367)
assert SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324') == (124, 0, 6367)
assert getattr(SemVer.parse('Google Chrome 124.0.6367.208+beta_234. 234.234.123\n123.456.324'), 'full_text') == 'Google Chrome 124.0.6367.208+beta_234. 234.234.123'
assert SemVer.parse('Google Chrome') == None
@validate_call
def bin_name(bin_path_or_name: str | Path) -> str:
name = Path(bin_path_or_name).name
assert len(name) > 1
assert name.replace('-', '').replace('_', '').replace('.', '').isalnum(), (
f'Binary name can only contain a-Z0-9-_.: {name}')
return name
BinName = Annotated[str, AfterValidator(bin_name)]
@validate_call
def path_is_file(path: Path | str) -> Path:
path = Path(path) if isinstance(path, str) else path
assert path.is_file(), f'Path is not a file: {path}'
return path
HostExistsPath = Annotated[Path, AfterValidator(path_is_file)]
@validate_call
def path_is_executable(path: HostExistsPath) -> HostExistsPath:
assert os.access(path, os.X_OK), f'Path is not executable (fix by running chmod +x {path})'
return path
@validate_call
def path_is_script(path: HostExistsPath) -> HostExistsPath:
SCRIPT_EXTENSIONS = ('.py', '.js', '.sh')
assert path.suffix.lower() in SCRIPT_EXTENSIONS, 'Path is not a script (does not end in {})'.format(', '.join(SCRIPT_EXTENSIONS))
return path
HostExecutablePath = Annotated[HostExistsPath, AfterValidator(path_is_executable)]
@validate_call
def path_is_abspath(path: Path) -> Path:
return path.resolve()
HostAbsPath = Annotated[HostExistsPath, AfterValidator(path_is_abspath)]
HostBinPath = Annotated[Path, AfterValidator(path_is_abspath), AfterValidator(path_is_file)]
@validate_call
def bin_abspath(bin_path_or_name: BinName | Path) -> HostBinPath | None:
assert bin_path_or_name
if str(bin_path_or_name).startswith('/'):
# already a path, get its absolute form
abspath = Path(bin_path_or_name).resolve()
else:
# not a path yet, get path using os.which
binpath = shutil.which(bin_path_or_name)
if not binpath:
return None
abspath = Path(binpath).resolve()
try:
return TypeAdapter(HostBinPath).validate_python(abspath)
except ValidationError:
return None
@validate_call
def bin_version(bin_path: HostBinPath, args=('--version',)) -> SemVer | None:
return SemVer(run([bin_path, *args], stdout=PIPE).stdout.strip().decode())
class InstalledBin(BaseModel):
abspath: HostBinPath
version: SemVer
def is_valid_install_string(pkgs_str: str) -> str:
"""Make sure a string is a valid install string for a package manager, e.g. 'yt-dlp ffmpeg'"""
assert pkgs_str
assert all(len(pkg) > 1 for pkg in pkgs_str.split(' '))
return pkgs_str
def is_valid_python_dotted_import(import_str: str) -> str:
assert import_str and import_str.replace('.', '').replace('_', '').isalnum()
return import_str
InstallStr = Annotated[str, AfterValidator(is_valid_install_string)]
LazyImportStr = Annotated[str, AfterValidator(is_valid_python_dotted_import)]
ProviderHandler = Callable[..., Any] | Callable[[], Any] # must take no args [], or [bin_name: str, **kwargs]
#ProviderHandlerStr = Annotated[str, AfterValidator(lambda s: s.startswith('self.'))]
ProviderHandlerRef = LazyImportStr | ProviderHandler
ProviderLookupDict = Dict[str, LazyImportStr]
ProviderType = Literal['abspath', 'version', 'subdeps', 'install']
# class Host(BaseModel):
# machine: str
# system: str
# platform: str
# in_docker: bool
# in_qemu: bool
# python: str
BinProviderName = Literal['env', 'pip', 'apt', 'brew', 'npm', 'vendor']
class BinProvider(ABC, BaseModel):
name: BinProviderName
abspath_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_abspath'}, exclude=True)
version_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_version'}, exclude=True)
subdeps_provider: ProviderLookupDict = Field(default={'*': 'self.on_get_subdeps'}, exclude=True)
install_provider: ProviderLookupDict = Field(default={'*': 'self.on_install'}, exclude=True)
_abspath_cache: ClassVar = {}
_version_cache: ClassVar = {}
_install_cache: ClassVar = {}
# def provider_version(self) -> SemVer | None:
# """Version of the actual underlying package manager (e.g. pip v20.4.1)"""
# if self.name in ('env', 'vendor'):
# return SemVer('0.0.0')
# installer_binpath = Path(shutil.which(self.name)).resolve()
# return bin_version(installer_binpath)
# def provider_host(self) -> Host:
# """Information about the host env, archictecture, and OS needed to select & build packages"""
# p = platform.uname()
# return Host(
# machine=p.machine,
# system=p.system,
# platform=platform.platform(),
# python=sys.implementation.name,
# in_docker=os.environ.get('IN_DOCKER', '').lower() == 'true',
# in_qemu=os.environ.get('IN_QEMU', '').lower() == 'true',
# )
def get_default_providers(self):
return self.get_providers_for_bin('*')
def resolve_provider_func(self, provider_func: ProviderHandlerRef | None) -> ProviderHandler | None:
if provider_func is None:
return None
# if provider_func is a dotted path to a function on self, swap it for the actual function
if isinstance(provider_func, str) and provider_func.startswith('self.'):
provider_func = getattr(self, provider_func.split('self.', 1)[-1])
# if provider_func is a dot-formatted import string, import the function
if isinstance(provider_func, str):
from django.utils.module_loading import import_string
package_name, module_name, classname, path = provider_func.split('.', 3) # -> abc, def, ghi.jkl
# get .ghi.jkl nested attr present on module abc.def
imported_module = import_string(f'{package_name}.{module_name}.{classname}')
provider_func = operator.attrgetter(path)(imported_module)
# # abc.def.ghi.jkl -> 1, 2, 3
# for idx in range(1, len(path)):
# parent_path = '.'.join(path[:-idx]) # abc.def.ghi
# try:
# parent_module = import_string(parent_path)
# provider_func = getattr(parent_module, path[-idx])
# except AttributeError, ImportError:
# continue
assert TypeAdapter(ProviderHandler).validate_python(provider_func), (
f'{self.__class__.__name__} provider func for {bin_name} was not a function or dotted-import path: {provider_func}')
return provider_func
@validate_call
def get_providers_for_bin(self, bin_name: str) -> ProviderLookupDict:
providers_for_bin = {
'abspath': self.abspath_provider.get(bin_name),
'version': self.version_provider.get(bin_name),
'subdeps': self.subdeps_provider.get(bin_name),
'install': self.install_provider.get(bin_name),
}
only_set_providers_for_bin = {k: v for k, v in providers_for_bin.items() if v is not None}
return only_set_providers_for_bin
@validate_call
def get_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None) -> ProviderHandler:
"""
Get the provider func for a given key + Dict of provider callbacks + fallback default provider.
e.g. get_provider_for_action(bin_name='yt-dlp', 'install', default_provider=self.on_install, ...) -> Callable
"""
provider_func_ref = (
(overrides or {}).get(provider_type)
or self.get_providers_for_bin(bin_name).get(provider_type)
or self.get_default_providers().get(provider_type)
or default_provider
)
# print('getting provider for action', bin_name, provider_type, provider_func)
provider_func = self.resolve_provider_func(provider_func_ref)
assert provider_func, f'No {self.name} provider func was found for {bin_name} in: {self.__class__.__name__}.'
return provider_func
@validate_call
def call_provider_for_action(self, bin_name: BinName, provider_type: ProviderType, default_provider: Optional[ProviderHandlerRef]=None, overrides: Optional[ProviderLookupDict]=None, **kwargs) -> Any:
provider_func: ProviderHandler = self.get_provider_for_action(
bin_name=bin_name,
provider_type=provider_type,
default_provider=default_provider,
overrides=overrides,
)
if not func_takes_args_or_kwargs(provider_func):
# if it's a pure argless lambdas, dont pass bin_path and other **kwargs
provider_func_without_args = cast(Callable[[], Any], provider_func)
return provider_func_without_args()
provider_func = cast(Callable[..., Any], provider_func)
return provider_func(bin_name, **kwargs)
def on_get_abspath(self, bin_name: BinName, **_) -> HostBinPath | None:
print(f'[*] {self.__class__.__name__}: Getting abspath for {bin_name}...')
try:
return bin_abspath(bin_name)
except ValidationError:
return None
def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **_) -> SemVer | None:
abspath = abspath or self._abspath_cache.get(bin_name) or self.get_abspath(bin_name)
if not abspath: return None
print(f'[*] {self.__class__.__name__}: Getting version for {bin_name}...')
try:
return bin_version(abspath)
except ValidationError:
return None
def on_get_subdeps(self, bin_name: BinName, **_) -> InstallStr:
print(f'[*] {self.__class__.__name__}: Getting subdependencies for {bin_name}')
# ... subdependency calculation logic here
return TypeAdapter(InstallStr).validate_python(bin_name)
@abstractmethod
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
# ... install logic here
assert True
@validate_call
def get_abspath(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> HostBinPath | None:
abspath = self.call_provider_for_action(
bin_name=bin_name,
provider_type='abspath',
default_provider=self.on_get_abspath,
overrides=overrides,
)
if not abspath:
return None
result = TypeAdapter(HostBinPath).validate_python(abspath)
self._abspath_cache[bin_name] = result
return result
@validate_call
def get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, overrides: Optional[ProviderLookupDict]=None) -> SemVer | None:
version = self.call_provider_for_action(
bin_name=bin_name,
provider_type='version',
default_provider=self.on_get_version,
overrides=overrides,
abspath=abspath,
)
if not version:
return None
result = SemVer(version)
self._version_cache[bin_name] = result
return result
@validate_call
def get_subdeps(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstallStr:
subdeps = self.call_provider_for_action(
bin_name=bin_name,
provider_type='subdeps',
default_provider=self.on_get_subdeps,
overrides=overrides,
)
if not subdeps:
subdeps = bin_name
result = TypeAdapter(InstallStr).validate_python(subdeps)
return result
@validate_call
def install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None) -> InstalledBin | None:
subdeps = self.get_subdeps(bin_name, overrides=overrides)
self.call_provider_for_action(
bin_name=bin_name,
provider_type='install',
default_provider=self.on_install,
overrides=overrides,
subdeps=subdeps,
)
installed_abspath = self.get_abspath(bin_name)
assert installed_abspath, f'Unable to find {bin_name} abspath after installing with {self.name}'
installed_version = self.get_version(bin_name, abspath=installed_abspath)
assert installed_version, f'Unable to find {bin_name} version after installing with {self.name}'
result = InstalledBin(abspath=installed_abspath, version=installed_version)
self._install_cache[bin_name] = result
return result
@validate_call
def load(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=False) -> InstalledBin | None:
installed_abspath = None
installed_version = None
if cache:
installed_bin = self._install_cache.get(bin_name)
if installed_bin:
return installed_bin
installed_abspath = self._abspath_cache.get(bin_name)
installed_version = self._version_cache.get(bin_name)
installed_abspath = installed_abspath or self.get_abspath(bin_name, overrides=overrides)
if not installed_abspath:
return None
installed_version = installed_version or self.get_version(bin_name, abspath=installed_abspath, overrides=overrides)
if not installed_version:
return None
return InstalledBin(abspath=installed_abspath, version=installed_version)
@validate_call
def load_or_install(self, bin_name: BinName, overrides: Optional[ProviderLookupDict]=None, cache: bool=True) -> InstalledBin | None:
installed = self.load(bin_name, overrides=overrides, cache=cache)
if not installed:
installed = self.install(bin_name, overrides=overrides)
return installed
class PipProvider(BinProvider):
name: BinProviderName = 'pip'
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
proc = run(['pip', 'install', '--upgrade', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__}: install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class AptProvider(BinProvider):
name: BinProviderName = 'apt'
subdeps_provider: ProviderLookupDict = {
'yt-dlp': lambda: 'yt-dlp ffmpeg',
}
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
run(['apt-get', 'update', '-qq'])
proc = run(['apt-get', 'install', '-y', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class BrewProvider(BinProvider):
name: BinProviderName = 'brew'
def on_install(self, bin_name: str, subdeps: Optional[InstallStr]=None, **_):
subdeps = subdeps or self.on_get_subdeps(bin_name)
print(f'[*] {self.__class__.__name__}: Installing subdependencies for {bin_name} ({subdeps})')
proc = run(['brew', 'install', *subdeps.split(' ')], stdout=PIPE, stderr=PIPE)
if proc.returncode != 0:
print(proc.stdout.strip().decode())
print(proc.stderr.strip().decode())
raise Exception(f'{self.__class__.__name__} install got returncode {proc.returncode} while installing {subdeps}: {subdeps}')
class EnvProvider(BinProvider):
name: BinProviderName = 'env'
abspath_provider: ProviderLookupDict = {
# 'python': lambda: Path('/opt/homebrew/Cellar/python@3.10/3.10.14/Frameworks/Python.framework/Versions/3.10/bin/python3.10'),
}
version_provider: ProviderLookupDict = {
# 'python': lambda: '{}.{}.{}'.format(*sys.version_info[:3]),
}
def on_install(self, bin_name: BinName, subdeps: Optional[InstallStr]=None, **_):
"""The env provider is ready-only and does not install any packages, so this is a no-op"""
pass