""" WARNING: THIS FILE IS ALL LEGACY CODE TO BE REMOVED. DO NOT ADD ANY NEW FEATURES TO THIS FILE, NEW CODE GOES HERE: core/models.py These are the old types we used to use before ArchiveBox v0.4 (before we switched to Django). """ __package__ = 'archivebox.index' from datetime import datetime, timezone, timedelta from typing import List, Dict, Any, Optional, Union from dataclasses import dataclass, asdict, field, fields from django.utils.functional import cached_property import abx from archivebox.config import ARCHIVE_DIR, CONSTANTS from archivebox.misc.system import get_dir_size from archivebox.misc.util import ts_to_date_str, parse_date from archivebox.misc.logging import stderr, ANSI class ArchiveError(Exception): def __init__(self, message, hints=None): super().__init__(message) self.hints = hints LinkDict = Dict[str, Any] ArchiveOutput = Union[str, Exception, None] @dataclass(frozen=True) class ArchiveResult: cmd: List[str] pwd: Optional[str] cmd_version: Optional[str] output: ArchiveOutput status: str start_ts: datetime end_ts: datetime index_texts: Union[List[str], None] = None schema: str = 'ArchiveResult' def __post_init__(self): self.typecheck() def _asdict(self): return asdict(self) def typecheck(self) -> None: assert self.schema == self.__class__.__name__ assert isinstance(self.status, str) and self.status assert isinstance(self.start_ts, datetime) assert isinstance(self.end_ts, datetime) assert isinstance(self.cmd, list) assert all(isinstance(arg, str) and arg for arg in self.cmd) # TODO: replace emptystrings in these three with None / remove them from the DB assert self.pwd is None or isinstance(self.pwd, str) assert self.cmd_version is None or isinstance(self.cmd_version, str) assert self.output is None or isinstance(self.output, (str, Exception)) @classmethod def guess_ts(_cls, dict_info): parsed_timestamp = parse_date(dict_info["timestamp"]) start_ts = parsed_timestamp end_ts = parsed_timestamp + timedelta(seconds=int(dict_info["duration"])) return start_ts, end_ts @classmethod def from_json(cls, json_info, guess=False): info = { key: val for key, val in json_info.items() if key in cls.field_names() } if guess: keys = info.keys() if "start_ts" not in keys: info["start_ts"], info["end_ts"] = cls.guess_ts(json_info) else: info['start_ts'] = parse_date(info['start_ts']) info['end_ts'] = parse_date(info['end_ts']) if "pwd" not in keys: info["pwd"] = str(ARCHIVE_DIR / json_info["timestamp"]) if "cmd_version" not in keys: info["cmd_version"] = "Undefined" if "cmd" not in keys: info["cmd"] = [] else: info['start_ts'] = parse_date(info['start_ts']) info['end_ts'] = parse_date(info['end_ts']) info['cmd_version'] = info.get('cmd_version') if type(info["cmd"]) is str: info["cmd"] = [info["cmd"]] return cls(**info) def to_dict(self, *keys) -> dict: if keys: return {k: v for k, v in asdict(self).items() if k in keys} return asdict(self) def to_json(self, indent=4, sort_keys=True) -> str: from .json import to_json return to_json(self, indent=indent, sort_keys=sort_keys) def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str: from .csv import to_csv return to_csv(self, csv_col=cols or self.field_names(), separator=separator, ljust=ljust) @classmethod def field_names(cls): return [f.name for f in fields(cls)] @property def duration(self) -> int: return (self.end_ts - self.start_ts).seconds @dataclass(frozen=True) class Link: timestamp: str url: str title: Optional[str] tags: Optional[str] sources: List[str] history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {}) downloaded_at: Optional[datetime] = None schema: str = 'Link' def __str__(self) -> str: return f'[{self.timestamp}] {self.url} "{self.title}"' def __post_init__(self): self.typecheck() def overwrite(self, **kwargs): """pure functional version of dict.update that returns a new instance""" return Link(**{**self._asdict(), **kwargs}) def __eq__(self, other): if not isinstance(other, Link): return NotImplemented return self.url == other.url def __gt__(self, other): if not isinstance(other, Link): return NotImplemented if not self.timestamp or not other.timestamp: return return float(self.timestamp) > float(other.timestamp) def typecheck(self) -> None: try: assert self.schema == self.__class__.__name__ assert isinstance(self.timestamp, str) and self.timestamp, f'timestamp must be a non-empty string, got: "{self.timestamp}"' assert self.timestamp.replace('.', '').isdigit(), f'timestamp must be a float str, got: "{self.timestamp}"' assert isinstance(self.url, str) and '://' in self.url, f'url must be a non-empty string, got: "{self.url}"' assert self.downloaded_at is None or isinstance(self.downloaded_at, datetime), f'downloaded_at must be a datetime or None, got: {self.downloaded_at}' assert self.title is None or (isinstance(self.title, str) and self.title), f'title must be a non-empty string or None, got: "{self.title}"' assert self.tags is None or isinstance(self.tags, str), f'tags must be a string or None, got: "{self.tags}"' assert isinstance(self.sources, list), f'sources must be a list, got: {self.sources}' assert all(isinstance(source, str) and source for source in self.sources) assert isinstance(self.history, dict) for method, results in self.history.items(): assert isinstance(method, str) and method assert isinstance(results, list) assert all(isinstance(result, ArchiveResult) for result in results) except Exception: stderr('{red}[X] Error while loading link! [{}] {} "{}"{reset}'.format(self.timestamp, self.url, self.title, **ANSI)) raise def _asdict(self, extended=False): info = { 'schema': 'Link', 'url': self.url, 'title': self.title or None, 'timestamp': self.timestamp, 'downloaded_at': self.downloaded_at or None, 'tags': self.tags or None, 'sources': self.sources or [], 'history': self.history or {}, } if extended: info.update({ 'snapshot_id': self.snapshot_id, 'snapshot_abid': self.snapshot_abid, 'link_dir': self.link_dir, 'archive_path': self.archive_path, 'hash': self.url_hash, 'base_url': self.base_url, 'scheme': self.scheme, 'domain': self.domain, 'path': self.path, 'basename': self.basename, 'extension': self.extension, 'is_static': self.is_static, 'tags_str': (self.tags or '').strip(','), # only used to render static index in index/html.py, remove if no longer needed there 'icons': None, # only used to render static index in index/html.py, remove if no longer needed there 'bookmarked_date': self.bookmarked_date, 'downloaded_datestr': self.downloaded_datestr, 'oldest_archive_date': self.oldest_archive_date, 'newest_archive_date': self.newest_archive_date, 'is_archived': self.is_archived, 'num_outputs': self.num_outputs, 'num_failures': self.num_failures, 'latest': self.latest_outputs(), 'canonical': self.canonical_outputs(), }) return info def as_snapshot(self): from core.models import Snapshot return Snapshot.objects.get(url=self.url) @classmethod def from_json(cls, json_info, guess=False): info = { key: val for key, val in json_info.items() if key in cls.field_names() } info['downloaded_at'] = parse_date(info.get('updated') or info.get('downloaded_at')) info['sources'] = info.get('sources') or [] json_history = info.get('history') or {} cast_history = {} for method, method_history in json_history.items(): cast_history[method] = [] for json_result in method_history: assert isinstance(json_result, dict), 'Items in Link["history"][method] must be dicts' cast_result = ArchiveResult.from_json(json_result, guess) cast_history[method].append(cast_result) info['history'] = cast_history return cls(**info) def to_json(self, indent=4, sort_keys=True) -> str: from .json import to_json return to_json(self, indent=indent, sort_keys=sort_keys) def to_csv(self, cols: Optional[List[str]]=None, separator: str=',', ljust: int=0) -> str: from .csv import to_csv return to_csv(self, cols=cols or self.field_names(), separator=separator, ljust=ljust) @cached_property def snapshot(self): from core.models import Snapshot return Snapshot.objects.only('id', 'abid').get(url=self.url) @cached_property def snapshot_id(self): return str(self.snapshot.pk) @cached_property def snapshot_abid(self): return str(self.snapshot.ABID) @classmethod def field_names(cls): return [f.name for f in fields(cls)] @property def link_dir(self) -> str: return str(ARCHIVE_DIR / self.timestamp) @property def archive_path(self) -> str: return '{}/{}'.format(CONSTANTS.ARCHIVE_DIR_NAME, self.timestamp) @property def archive_size(self) -> float: try: return get_dir_size(self.archive_path)[0] except Exception: return 0 ### URL Helpers @property def url_hash(self): from archivebox.misc.util import hashurl return hashurl(self.url) @property def scheme(self) -> str: from archivebox.misc.util import scheme return scheme(self.url) @property def extension(self) -> str: from archivebox.misc.util import extension return extension(self.url) @property def domain(self) -> str: from archivebox.misc.util import domain return domain(self.url) @property def path(self) -> str: from archivebox.misc.util import path return path(self.url) @property def basename(self) -> str: from archivebox.misc.util import basename return basename(self.url) @property def base_url(self) -> str: from archivebox.misc.util import base_url return base_url(self.url) ### Pretty Printing Helpers @property def bookmarked_date(self) -> Optional[str]: max_ts = (datetime.now(timezone.utc) + timedelta(days=30)).timestamp() if self.timestamp and self.timestamp.replace('.', '').isdigit(): if 0 < float(self.timestamp) < max_ts: return ts_to_date_str(datetime.fromtimestamp(float(self.timestamp))) else: return str(self.timestamp) return None @property def downloaded_datestr(self) -> Optional[str]: return ts_to_date_str(self.downloaded_at) if self.downloaded_at else None @property def archive_dates(self) -> List[datetime]: return [ parse_date(result.start_ts) for method in self.history.keys() for result in self.history[method] ] @property def oldest_archive_date(self) -> Optional[datetime]: return min(self.archive_dates, default=None) @property def newest_archive_date(self) -> Optional[datetime]: return max(self.archive_dates, default=None) ### Archive Status Helpers @property def num_outputs(self) -> int: return self.as_snapshot().num_outputs @property def num_failures(self) -> int: return sum(1 for method in self.history.keys() for result in self.history[method] if result.status == 'failed') @property def is_static(self) -> bool: from archivebox.misc.util import is_static_file return is_static_file(self.url) @property def is_archived(self) -> bool: from archivebox.misc.util import domain output_paths = ( domain(self.url), 'output.html', 'output.pdf', 'screenshot.png', 'singlefile.html', 'readability/content.html', 'mercury/content.html', 'htmltotext.txt', 'media', 'git', ) return any( (ARCHIVE_DIR / self.timestamp / path).exists() for path in output_paths ) def latest_outputs(self, status: str=None) -> Dict[str, ArchiveOutput]: """get the latest output that each archive method produced for link""" ARCHIVE_METHODS = ( 'title', 'favicon', 'wget', 'warc', 'singlefile', 'pdf', 'screenshot', 'dom', 'git', 'media', 'archive_org', ) latest: Dict[str, ArchiveOutput] = {} for archive_method in ARCHIVE_METHODS: # get most recent succesful result in history for each archive method history = self.history.get(archive_method) or [] history = list(filter(lambda result: result.output, reversed(history))) if status is not None: history = list(filter(lambda result: result.status == status, history)) history = list(history) if history: latest[archive_method] = history[0].output else: latest[archive_method] = None return latest def canonical_outputs(self) -> Dict[str, Optional[str]]: """predict the expected output paths that should be present after archiving""" from abx_plugin_wget.wget import wget_output_path from abx_plugin_favicon.config import FAVICON_CONFIG # TODO: banish this awful duplication from the codebase and import these # from their respective extractor files canonical = { 'index_path': 'index.html', 'favicon_path': 'favicon.ico', 'google_favicon_path': FAVICON_CONFIG.FAVICON_PROVIDER.format(self.domain), 'wget_path': wget_output_path(self), 'warc_path': 'warc/', 'singlefile_path': 'singlefile.html', 'readability_path': 'readability/content.html', 'mercury_path': 'mercury/content.html', 'htmltotext_path': 'htmltotext.txt', 'pdf_path': 'output.pdf', 'screenshot_path': 'screenshot.png', 'dom_path': 'output.html', 'archive_org_path': 'https://web.archive.org/web/{}'.format(self.base_url), 'git_path': 'git/', 'media_path': 'media/', 'headers_path': 'headers.json', } if self.is_static: # static binary files like PDF and images are handled slightly differently. # they're just downloaded once and aren't archived separately multiple times, # so the wget, screenshot, & pdf urls should all point to the same file static_path = wget_output_path(self) canonical.update({ 'title': self.basename, 'wget_path': static_path, 'pdf_path': static_path, 'screenshot_path': static_path, 'dom_path': static_path, 'singlefile_path': static_path, 'readability_path': static_path, 'mercury_path': static_path, 'htmltotext_path': static_path, }) return canonical