ArchiveBox/archivebox/util.py

320 lines
10 KiB
Python
Raw Normal View History

__package__ = 'archivebox'
2017-10-23 09:58:41 +00:00
import re
2020-09-15 19:05:48 +00:00
from pathlib import Path
2020-06-26 01:30:29 +00:00
import json as pyjson
2019-05-01 03:13:04 +00:00
from typing import List, Optional, Any
2019-03-31 01:29:16 +00:00
from inspect import signature
from functools import wraps
from hashlib import sha256
from urllib.parse import urlparse, quote, unquote
from html import escape, unescape
from datetime import datetime
from dateparser import parse as dateparser
import requests
2020-10-31 11:56:51 +00:00
from requests.exceptions import RequestException, ReadTimeout
from .base32_crockford import encode as base32_encode # type: ignore
from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
try:
import chardet
detect_encoding = lambda rawdata: chardet.detect(rawdata)["encoding"]
except ImportError:
detect_encoding = lambda rawdata: "utf-8"
### Parsing Helpers
# All of these are (str) -> str
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
scheme = lambda url: urlparse(url).scheme.lower()
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
without_path = lambda url: urlparse(url)._replace(path='', fragment='', query='').geturl().strip('//')
path = lambda url: urlparse(url).path
basename = lambda url: urlparse(url).path.rsplit('/', 1)[-1]
domain = lambda url: urlparse(url).netloc
query = lambda url: urlparse(url).query
fragment = lambda url: urlparse(url).fragment
extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
2017-10-23 09:58:41 +00:00
without_www = lambda url: url.replace('://www.', '://', 1)
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
2019-04-02 20:36:41 +00:00
hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: s and unquote(s)
htmlencode = lambda s: s and escape(s, quote=True)
htmldecode = lambda s: s and unescape(s)
2019-04-02 20:36:41 +00:00
short_ts = lambda ts: str(parse_date(ts).timestamp()).split('.')[0]
ts_to_date = lambda ts: ts and parse_date(ts).strftime('%Y-%m-%d %H:%M')
ts_to_iso = lambda ts: ts and parse_date(ts).isoformat()
2017-10-23 09:58:41 +00:00
URL_REGEX = re.compile(
r'http[s]?://' # start matching from allowed schemes
r'(?:[a-zA-Z]|[0-9]' # followed by allowed alphanum characters
r'|[$-_@.&+]|[!*\(\),]' # or allowed symbols
r'|(?:%[0-9a-fA-F][0-9a-fA-F]))' # or allowed unicode bytes
2020-08-18 12:04:57 +00:00
r'[^\]\[\(\)<>"\'\s]+', # stop parsing at these symbols
re.IGNORECASE,
)
COLOR_REGEX = re.compile(r'\[(?P<arg_1>\d+)(;(?P<arg_2>\d+)(;(?P<arg_3>\d+))?)?m')
2020-07-02 07:22:37 +00:00
2020-07-02 07:12:30 +00:00
def is_static_file(url: str):
# TODO: the proper way is with MIME type detection + ext, not only extension
from .config import STATICFILE_EXTENSIONS
return extension(url).lower() in STATICFILE_EXTENSIONS
2019-03-26 07:20:41 +00:00
def enforce_types(func):
"""
2019-03-27 03:25:07 +00:00
Enforce function arg and kwarg types at runtime using its python3 type hints
"""
2019-03-27 03:25:07 +00:00
# TODO: check return type as well
@wraps(func)
def typechecked_function(*args, **kwargs):
sig = signature(func)
def check_argument_type(arg_key, arg_val):
try:
annotation = sig.parameters[arg_key].annotation
except KeyError:
2019-03-31 01:29:16 +00:00
annotation = None
2019-03-31 01:29:16 +00:00
if annotation is not None and annotation.__class__ is type:
if not isinstance(arg_val, annotation):
raise TypeError(
'{}(..., {}: {}) got unexpected {} argument {}={}'.format(
func.__name__,
arg_key,
annotation.__name__,
type(arg_val).__name__,
arg_key,
str(arg_val)[:64],
)
)
# check args
for arg_val, arg_key in zip(args, sig.parameters):
check_argument_type(arg_key, arg_val)
# check kwargs
for arg_key, arg_val in kwargs.items():
check_argument_type(arg_key, arg_val)
return func(*args, **kwargs)
return typechecked_function
2017-10-23 09:58:41 +00:00
2019-05-01 03:13:04 +00:00
def docstring(text: Optional[str]):
"""attach the given docstring to the decorated function"""
def decorator(func):
if text:
func.__doc__ = text
return func
return decorator
2017-10-23 09:58:41 +00:00
2019-03-27 03:25:07 +00:00
@enforce_types
2019-03-26 07:20:41 +00:00
def str_between(string: str, start: str, end: str=None) -> str:
"""(<abc>12345</def>, <abc>, </def>) -> 12345"""
content = string.split(start, 1)[-1]
if end is not None:
content = content.rsplit(end, 1)[0]
return content
2019-03-27 03:25:07 +00:00
@enforce_types
def parse_date(date: Any) -> Optional[datetime]:
"""Parse unix timestamps, iso format, and human-readable strings"""
if date is None:
return None
2019-04-02 20:36:41 +00:00
if isinstance(date, datetime):
return date
if isinstance(date, (float, int)):
date = str(date)
if isinstance(date, str):
return dateparser(date)
raise ValueError('Tried to parse invalid date! {}'.format(date))
2019-03-27 03:25:07 +00:00
@enforce_types
2020-07-02 07:12:30 +00:00
def download_url(url: str, timeout: int=None) -> str:
"""Download the contents of a remote url and return the text"""
2020-07-02 07:12:30 +00:00
from .config import TIMEOUT, CHECK_SSL_VALIDITY, WGET_USER_AGENT
timeout = timeout or TIMEOUT
response = requests.get(
url,
headers={'User-Agent': WGET_USER_AGENT},
verify=CHECK_SSL_VALIDITY,
timeout=timeout,
)
content_type = response.headers.get('Content-Type', '')
encoding = http_content_type_encoding(content_type) or html_body_declared_encoding(response.text)
if encoding is not None:
response.encoding = encoding
return response.text
2020-09-11 14:06:52 +00:00
@enforce_types
def get_headers(url: str, timeout: int=None) -> str:
"""Download the contents of a remote url and return the headers"""
from .config import TIMEOUT, CHECK_SSL_VALIDITY, WGET_USER_AGENT
timeout = timeout or TIMEOUT
2020-09-23 18:14:49 +00:00
try:
response = requests.head(
url,
headers={'User-Agent': WGET_USER_AGENT},
verify=CHECK_SSL_VALIDITY,
timeout=timeout,
2020-10-31 11:56:51 +00:00
allow_redirects=True,
2020-09-23 18:14:49 +00:00
)
2020-09-24 13:37:27 +00:00
if response.status_code >= 400:
raise RequestException
2020-10-31 11:56:51 +00:00
except ReadTimeout:
raise
2020-09-23 18:14:49 +00:00
except RequestException:
response = requests.get(
url,
headers={'User-Agent': WGET_USER_AGENT},
verify=CHECK_SSL_VALIDITY,
timeout=timeout,
2020-09-24 13:37:27 +00:00
stream=True
2020-09-23 18:14:49 +00:00
)
2020-09-11 14:06:52 +00:00
2020-09-11 19:19:06 +00:00
return pyjson.dumps(dict(response.headers), indent=4)
2020-09-11 14:06:52 +00:00
2019-03-27 03:25:07 +00:00
@enforce_types
2019-03-26 07:20:41 +00:00
def chrome_args(**options) -> List[str]:
2019-03-21 05:28:12 +00:00
"""helper to build up a chrome shell command with arguments"""
2020-07-02 07:12:30 +00:00
from .config import CHROME_OPTIONS
2019-03-23 03:00:53 +00:00
options = {**CHROME_OPTIONS, **options}
cmd_args = [options['CHROME_BINARY']]
2019-03-21 05:28:12 +00:00
2019-03-23 03:00:53 +00:00
if options['CHROME_HEADLESS']:
2019-03-21 05:28:12 +00:00
cmd_args += ('--headless',)
if not options['CHROME_SANDBOX']:
# assume this means we are running inside a docker container
# in docker, GPU support is limited, sandboxing is unecessary,
# and SHM is limited to 64MB by default (which is too low to be usable).
cmd_args += (
'--no-sandbox',
'--disable-gpu',
'--disable-dev-shm-usage',
'--disable-software-rasterizer',
)
2019-03-21 05:28:12 +00:00
if not options['CHECK_SSL_VALIDITY']:
2019-03-21 05:28:12 +00:00
cmd_args += ('--disable-web-security', '--ignore-certificate-errors')
if options['CHROME_USER_AGENT']:
cmd_args += ('--user-agent={}'.format(options['CHROME_USER_AGENT']),)
2019-03-21 05:28:12 +00:00
if options['RESOLUTION']:
cmd_args += ('--window-size={}'.format(options['RESOLUTION']),)
2019-03-21 05:28:12 +00:00
if options['TIMEOUT']:
cmd_args += ('--timeout={}'.format((options['TIMEOUT']) * 1000),)
2019-03-21 05:28:12 +00:00
if options['CHROME_USER_DATA_DIR']:
cmd_args.append('--user-data-dir={}'.format(options['CHROME_USER_DATA_DIR']))
2019-03-21 05:28:12 +00:00
return cmd_args
2020-11-28 07:12:27 +00:00
def ansi_to_html(text):
"""
Based on: https://stackoverflow.com/questions/19212665/python-converting-ansi-color-codes-to-html
"""
2020-07-02 07:12:30 +00:00
from .config import COLOR_DICT
2020-07-02 07:22:37 +00:00
TEMPLATE = '<span style="color: rgb{}"><br>'
text = text.replace('[m', '</span>')
def single_sub(match):
argsdict = match.groupdict()
if argsdict['arg_3'] is None:
if argsdict['arg_2'] is None:
_, color = 0, argsdict['arg_1']
else:
_, color = argsdict['arg_1'], argsdict['arg_2']
else:
_, color = argsdict['arg_3'], argsdict['arg_2']
return TEMPLATE.format(COLOR_DICT[color][0])
return COLOR_REGEX.sub(single_sub, text)
2020-07-13 15:24:49 +00:00
class AttributeDict(dict):
"""Helper to allow accessing dict values via Example.key or Example['key']"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Recursively convert nested dicts to AttributeDicts (optional):
# for key, val in self.items():
# if isinstance(val, dict) and type(val) is not AttributeDict:
# self[key] = AttributeDict(val)
def __getattr__(self, attr: str) -> Any:
return dict.__getitem__(self, attr)
def __setattr__(self, attr: str, value: Any) -> None:
return dict.__setitem__(self, attr, value)
2019-05-01 03:13:04 +00:00
class ExtendedEncoder(pyjson.JSONEncoder):
"""
Extended json serializer that supports serializing several model
fields and objects
"""
def default(self, obj):
cls_name = obj.__class__.__name__
if hasattr(obj, '_asdict'):
return obj._asdict()
elif isinstance(obj, bytes):
return obj.decode()
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, Exception):
return '{}: {}'.format(obj.__class__.__name__, obj)
2020-09-08 21:29:43 +00:00
elif isinstance(obj, Path):
return str(obj)
elif cls_name in ('dict_items', 'dict_keys', 'dict_values'):
return tuple(obj)
2019-05-01 03:13:04 +00:00
return pyjson.JSONEncoder.default(self, obj)
2019-04-27 21:26:24 +00:00