__package__ = 'archivebox.index' import os from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Any, Optional, Union from dataclasses import dataclass, asdict, field, fields from ..system import get_dir_size from ..config import OUTPUT_DIR, ARCHIVE_DIR_NAME class ArchiveError(Exception): def __init__(self, message, hints=None): super().__init__(message) self.hints = hints LinkDict = Dict[str, Any] ArchiveOutput = Union[str, Exception, None] @dataclass(frozen=True) class ArchiveResult: cmd: List[str] pwd: Optional[str] cmd_version: Optional[str] output: ArchiveOutput status: str start_ts: datetime end_ts: datetime schema: str = 'ArchiveResult' def __post_init__(self): self.typecheck() def _asdict(self): return asdict(self) def typecheck(self) -> None: assert self.schema == self.__class__.__name__ assert isinstance(self.status, str) and self.status assert isinstance(self.start_ts, datetime) assert isinstance(self.end_ts, datetime) assert isinstance(self.cmd, list) assert all(isinstance(arg, str) and arg for arg in self.cmd) assert self.pwd is None or isinstance(self.pwd, str) and self.pwd assert self.cmd_version is None or isinstance(self.cmd_version, str) and self.cmd_version assert self.output is None or isinstance(self.output, (str, Exception)) if isinstance(self.output, str): assert self.output @classmethod def guess_ts(_cls, dict_info): from ..util import parse_date parsed_timestamp = parse_date(dict_info["timestamp"]) start_ts = parsed_timestamp end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"])) return start_ts, end_ts @classmethod def from_json(cls, json_info, guess=False): from ..util import parse_date info = { key: val for key, val in json_info.items() if key in cls.field_names() } if guess: keys = info.keys() if "start_ts" not in keys: info["start_ts"], info["end_ts"] = cls.guess_ts(json_info) else: info['start_ts'] = parse_date(info['start_ts']) info['end_ts'] = parse_date(info['end_ts']) if "pwd" not in keys: info["pwd"] = str(Path(OUTPUT_DIR) / ARCHIVE_DIR_NAME / json_info["timestamp"]) if "cmd_version" not in keys: info["cmd_version"] = "Undefined" if "cmd" not in keys: info["cmd"] = [] else: info['start_ts'] = parse_date(info['start_ts']) info['end_ts'] = parse_date(info['end_ts']) info['cmd_version'] = info.get('cmd_version') return cls(**info) def to_dict(self, *keys) -> dict: if keys: return {k: v for k, v in asdict(self).items() if k in keys} return asdict(self) def to_json(self, indent=4, sort_keys=True) -> str: from .json import to_json return to_json(self, indent=indent, sort_keys=sort_keys) def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str: from .csv import to_csv return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust) @classmethod def field_names(cls): return [f.name for f in fields(cls)] @property def duration(self) -> int: return (self.end_ts - self.start_ts).seconds @dataclass(frozen=True) class Link: timestamp: str url: str title: Optional[str] tags: Optional[str] sources: List[str] history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {}) updated: Optional[datetime] = None schema: str = 'Link' def __str__(self) -> str: return f'[{self.timestamp}] {self.base_url} "{self.title}"' def __post_init__(self): self.typecheck() def overwrite(self, **kwargs): """pure functional version of dict.update that returns a new instance""" return Link(**{**self._asdict(), **kwargs}) def __eq__(self, other): if not isinstance(other, Link): return NotImplemented return self.url == other.url def __gt__(self, other): if not isinstance(other, Link): return NotImplemented if not self.timestamp or not other.timestamp: return return float(self.timestamp) > float(other.timestamp) def typecheck(self) -> None: from ..config import stderr, ANSI try: assert self.schema == self.__class__.__name__ assert isinstance(self.timestamp, str) and self.timestamp assert self.timestamp.replace('.', '').isdigit() assert isinstance(self.url, str) and '://' in self.url assert self.updated is None or isinstance(self.updated, datetime) assert self.title is None or (isinstance(self.title, str) and self.title) assert self.tags is None or isinstance(self.tags, str) assert isinstance(self.sources, list) assert all(isinstance(source, str) and source for source in self.sources) assert isinstance(self.history, dict) for method, results in self.history.items(): assert isinstance(method, str) and method assert isinstance(results, list) assert all(isinstance(result, ArchiveResult) for result in results) except Exception: stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI)) raise def _asdict(self, extended=False): info = { 'schema': 'Link', 'url': self.url, 'title': self.title or None, 'timestamp': self.timestamp, 'updated': self.updated or None, 'tags': self.tags or None, 'sources': self.sources or [], 'history': self.history or {}, } if extended: info.update({ 'link_dir': self.link_dir, 'archive_path': self.archive_path, 'hash': self.url_hash, 'base_url': self.base_url, 'scheme': self.scheme, 'domain': self.domain, 'path': self.path, 'basename': self.basename, 'extension': self.extension, 'is_static': self.is_static, 'bookmarked_date': self.bookmarked_date, 'updated_date': self.updated_date, 'oldest_archive_date': self.oldest_archive_date, 'newest_archive_date': self.newest_archive_date, 'is_archived': self.is_archived, 'num_outputs': self.num_outputs, 'num_failures': self.num_failures, 'latest': self.latest_outputs(), 'canonical': self.canonical_outputs(), }) return info @classmethod def from_json(cls, json_info, guess=False): from ..util import parse_date info = { key: val for key, val in json_info.items() if key in cls.field_names() } info['updated'] = parse_date(info.get('updated')) info['sources'] = info.get('sources') or [] json_history = info.get('history') or {} cast_history = {} for method, method_history in json_history.items(): cast_history[method] = [] for json_result in method_history: assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts' cast_result = ArchiveResult.from_json(json_result, guess) cast_history[method].append(cast_result) info['history'] = cast_history return cls(**info) def to_json(self, indent=4, sort_keys=True) -> str: from .json import to_json return to_json(self, indent=indent, sort_keys=sort_keys) def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str: from .csv import to_csv return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust) @classmethod def field_names(cls): return [f.name for f in fields(cls)] @property def link_dir(self) -> str: from ..config import CONFIG return os.path.join(CONFIG['ARCHIVE_DIR'], self.timestamp) @property def archive_path(self) -> str: from ..config import ARCHIVE_DIR_NAME return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp) @property def archive_size(self) -> float: try: return get_dir_size(self.archive_path)[0] except Exception: return 0 ### URL Helpers @property def url_hash(self): from ..util import hashurl return hashurl(self.url) @property def scheme(self) -> str: from ..util import scheme return scheme(self.url) @property def extension(self) -> str: from ..util import extension return extension(self.url) @property def domain(self) -> str: from ..util import domain return domain(self.url) @property def path(self) -> str: from ..util import path return path(self.url) @property def basename(self) -> str: from ..util import basename return basename(self.url) @property def base_url(self) -> str: from ..util import base_url return base_url(self.url) ### Pretty Printing Helpers @property def bookmarked_date(self) -> Optional[str]: from ..util import ts_to_date max_ts = (datetime.now() + timedelta(days=30)).timestamp() if self.timestamp and self.timestamp.replace('.', '').isdigit(): if 0 < float(self.timestamp) < max_ts: return ts_to_date(datetime.fromtimestamp(float(self.timestamp))) else: return str(self.timestamp) return None @property def updated_date(self) -> Optional[str]: from ..util import ts_to_date return ts_to_date(self.updated) if self.updated else None @property def archive_dates(self) -> List[datetime]: return [ result.start_ts for method in self.history.keys() for result in self.history[method] ] @property def oldest_archive_date(self) -> Optional[datetime]: return min(self.archive_dates, default=None) @property def newest_archive_date(self) -> Optional[datetime]: return max(self.archive_dates, default=None) ### Archive Status Helpers @property def num_outputs(self) -> int: return len(tuple(filter(None, self.latest_outputs().values()))) @property def num_failures(self) -> int: return sum(1 for method in self.history.keys() for result in self.history[method] if result.status == 'failed') @property def is_static(self) -> bool: from ..util import is_static_file return is_static_file(self.url) @property def is_archived(self) -> bool: from ..config import ARCHIVE_DIR from ..util import domain output_paths = ( domain(self.url), 'output.pdf', 'screenshot.png', 'output.html', 'media', ) return any( os.path.exists(os.path.join(ARCHIVE_DIR, self.timestamp, path)) for path in output_paths ) def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]: """get the latest output that each archive method produced for link""" ARCHIVE_METHODS = ( 'title', 'favicon', 'wget', 'warc', 'pdf', 'screenshot', 'dom', 'git', 'media', 'archive_org', ) latest: Dict[str, ArchiveOutput] = {} for archive_method in ARCHIVE_METHODS: # get most recent succesful result in history for each archive method history = self.history.get(archive_method) or [] history = list(filter(lambda result: result.output, reversed(history))) if status is not None: history = list(filter(lambda result: result.status == status, history)) history = list(history) if history: latest[archive_method] = history[0].output else: latest[archive_method] = None return latest def canonical_outputs(self) -> Dict[str, Optional[str]]: """predict the expected output paths that should be present after archiving""" from ..extractors.wget import wget_output_path canonical = { 'index_path': 'index.html', 'favicon_path': 'favicon.ico', 'google_favicon_path': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain), 'wget_path': wget_output_path(self), 'warc_path': 'warc', 'pdf_path': 'output.pdf', 'screenshot_path': 'screenshot.png', 'dom_path': 'output.html', 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url), 'git_path': 'git', 'media_path': 'media', } if self.is_static: # static binary files like PDF and images are handled slightly differently. # they're just downloaded once and aren't archived separately multiple times, # so the wget, screenshot, & pdf urls should all point to the same file static_path = wget_output_path(self) canonical.update({ 'title': self.basename, 'wget_path': static_path, 'pdf_path': static_path, 'screenshot_path': static_path, 'dom_path': static_path, }) return canonical