switch to dataclasses, working Link type hints everywhere

This commit is contained in:
Nick Sweeting 2019-03-26 19:21:34 -04:00
parent 346811fb78
commit 25a107df43
10 changed files with 504 additions and 363 deletions

View file

@ -12,14 +12,13 @@ Usage & Documentation:
import os
import sys
from typing import List
from typing import List, Optional
from schema import Link
from links import links_after_timestamp
from index import write_links_index, load_links_index
from archive_methods import archive_link
from config import (
ARCHIVE_DIR,
ONLY_NEW,
OUTPUT_DIR,
GIT_SHA,
@ -109,19 +108,19 @@ def update_archive_data(import_path: str=None, resume: float=None) -> List[Link]
all_links, new_links = load_links_index(out_dir=OUTPUT_DIR, import_path=import_path)
# Step 2: Write updated index with deduped old and new links back to disk
write_links_index(out_dir=OUTPUT_DIR, links=all_links)
write_links_index(out_dir=OUTPUT_DIR, links=list(all_links))
# Step 3: Run the archive methods for each link
links = new_links if ONLY_NEW else all_links
log_archiving_started(len(links), resume)
idx, link = 0, {'timestamp': 0}
idx: int = 0
link: Optional[Link] = None
try:
for idx, link in enumerate(links_after_timestamp(links, resume)):
link_dir = os.path.join(ARCHIVE_DIR, link['timestamp'])
archive_link(link_dir, link)
archive_link(link)
except KeyboardInterrupt:
log_archiving_paused(len(links), idx, link['timestamp'])
log_archiving_paused(len(links), idx, link.timestamp if link else '0')
raise SystemExit(0)
except:
@ -132,7 +131,7 @@ def update_archive_data(import_path: str=None, resume: float=None) -> List[Link]
# Step 4: Re-write links index with updated titles, icons, and resources
all_links, _ = load_links_index(out_dir=OUTPUT_DIR)
write_links_index(out_dir=OUTPUT_DIR, links=all_links, finished=True)
write_links_index(out_dir=OUTPUT_DIR, links=list(all_links), finished=True)
return all_links
if __name__ == '__main__':

View file

@ -52,7 +52,6 @@ from util import (
chmod_file,
wget_output_path,
chrome_args,
check_link_structure,
run, PIPE, DEVNULL,
Link,
)
@ -64,9 +63,7 @@ from logs import (
)
def archive_link(link_dir: str, link: Link, page=None) -> Link:
def archive_link(link: Link, page=None) -> Link:
"""download the DOM, PDF, and a screenshot into a folder named after the link's timestamp"""
ARCHIVE_METHODS = (
@ -82,24 +79,24 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
)
try:
is_new = not os.path.exists(link_dir)
is_new = not os.path.exists(link.link_dir)
if is_new:
os.makedirs(link_dir)
os.makedirs(link.link_dir)
link = load_json_link_index(link_dir, link)
log_link_archiving_started(link_dir, link, is_new)
link = load_json_link_index(link.link_dir, link)
log_link_archiving_started(link.link_dir, link, is_new)
stats = {'skipped': 0, 'succeeded': 0, 'failed': 0}
for method_name, should_run, method_function in ARCHIVE_METHODS:
if method_name not in link['history']:
link['history'][method_name] = []
if method_name not in link.history:
link.history[method_name] = []
if should_run(link_dir, link):
if should_run(link.link_dir, link):
log_archive_method_started(method_name)
result = method_function(link_dir, link)
result = method_function(link.link_dir, link)
link['history'][method_name].append(result._asdict())
link.history[method_name].append(result)
stats[result.status] += 1
log_archive_method_finished(result)
@ -108,14 +105,22 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
# print(' ', stats)
write_link_index(link_dir, link)
link = Link(**{
**link._asdict(),
'updated': datetime.now(),
})
write_link_index(link.link_dir, link)
patch_links_index(link)
log_link_archiving_finished(link_dir, link, is_new, stats)
log_link_archiving_finished(link.link_dir, link, is_new, stats)
except KeyboardInterrupt:
raise
except Exception as err:
print(' ! Failed to archive link: {}: {}'.format(err.__class__.__name__, err))
raise
return link
@ -123,10 +128,10 @@ def archive_link(link_dir: str, link: Link, page=None) -> Link:
def should_fetch_title(link_dir: str, link: Link) -> bool:
# if link already has valid title, skip it
if link['title'] and not link['title'].lower().startswith('http'):
if link.title and not link.title.lower().startswith('http'):
return False
if is_static_file(link['url']):
if is_static_file(link.url):
return False
return FETCH_TITLE
@ -137,7 +142,7 @@ def fetch_title(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResul
output = None
cmd = [
CURL_BINARY,
link['url'],
link.url,
'|',
'grep',
'<title>',
@ -145,7 +150,7 @@ def fetch_title(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResul
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
try:
output = fetch_page_title(link['url'], timeout=timeout, progress=False)
output = fetch_page_title(link.url, timeout=timeout, progress=False)
if not output:
raise ArchiveError('Unable to detect page title')
except Exception as err:
@ -180,7 +185,7 @@ def fetch_favicon(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveRes
'--location',
'--output', output,
*(() if CHECK_SSL_VALIDITY else ('--insecure',)),
'https://www.google.com/s2/favicons?domain={}'.format(domain(link['url'])),
'https://www.google.com/s2/favicons?domain={}'.format(domain(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -240,7 +245,7 @@ def fetch_wget(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult
*(('--user-agent={}'.format(WGET_USER_AGENT),) if WGET_USER_AGENT else ()),
*(('--load-cookies', COOKIES_FILE) if COOKIES_FILE else ()),
*((() if CHECK_SSL_VALIDITY else ('--no-check-certificate', '--no-hsts'))),
link['url'],
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -290,7 +295,7 @@ def fetch_wget(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult
)
def should_fetch_pdf(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']):
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'output.pdf')):
@ -306,7 +311,7 @@ def fetch_pdf(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
cmd = [
*chrome_args(TIMEOUT=timeout),
'--print-to-pdf',
link['url'],
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -334,7 +339,7 @@ def fetch_pdf(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
)
def should_fetch_screenshot(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']):
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'screenshot.png')):
@ -349,7 +354,7 @@ def fetch_screenshot(link_dir: str, link: Link, timeout: int=TIMEOUT) -> Archive
cmd = [
*chrome_args(TIMEOUT=timeout),
'--screenshot',
link['url'],
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -377,7 +382,7 @@ def fetch_screenshot(link_dir: str, link: Link, timeout: int=TIMEOUT) -> Archive
)
def should_fetch_dom(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']):
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'output.html')):
@ -393,7 +398,7 @@ def fetch_dom(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
cmd = [
*chrome_args(TIMEOUT=timeout),
'--dump-dom',
link['url']
link.url
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -422,15 +427,15 @@ def fetch_dom(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
)
def should_fetch_git(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']):
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'git')):
return False
is_clonable_url = (
(domain(link['url']) in GIT_DOMAINS)
or (extension(link['url']) == 'git')
(domain(link.url) in GIT_DOMAINS)
or (extension(link.url) == 'git')
)
if not is_clonable_url:
return False
@ -450,7 +455,7 @@ def fetch_git(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
'--mirror',
'--recursive',
*(() if CHECK_SSL_VALIDITY else ('-c', 'http.sslVerify=false')),
without_query(without_fragment(link['url'])),
without_query(without_fragment(link.url)),
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -481,7 +486,7 @@ def fetch_git(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveResult:
def should_fetch_media(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']):
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'media')):
@ -515,7 +520,7 @@ def fetch_media(link_dir: str, link: Link, timeout: int=MEDIA_TIMEOUT) -> Archiv
'--embed-thumbnail',
'--add-metadata',
*(() if CHECK_SSL_VALIDITY else ('--no-check-certificate',)),
link['url'],
link.url,
]
status = 'succeeded'
timer = TimedProgress(timeout, prefix=' ')
@ -553,7 +558,7 @@ def fetch_media(link_dir: str, link: Link, timeout: int=MEDIA_TIMEOUT) -> Archiv
def should_fetch_archive_dot_org(link_dir: str, link: Link) -> bool:
if is_static_file(link['url']):
if is_static_file(link.url):
return False
if os.path.exists(os.path.join(link_dir, 'archive.org.txt')):
@ -567,7 +572,7 @@ def archive_dot_org(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveR
output = 'archive.org.txt'
archive_org_url = None
submit_url = 'https://web.archive.org/save/{}'.format(link['url'])
submit_url = 'https://web.archive.org/save/{}'.format(link.url)
cmd = [
CURL_BINARY,
'--location',
@ -586,7 +591,7 @@ def archive_dot_org(link_dir: str, link: Link, timeout: int=TIMEOUT) -> ArchiveR
archive_org_url = 'https://web.archive.org{}'.format(content_location[0])
elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
archive_org_url = None
# raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link['url'])))
# raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
elif errors:
raise ArchiveError(', '.join(errors))
else:

View file

@ -1,5 +1,4 @@
import os
import re
import sys
import shutil
@ -77,7 +76,7 @@ if COOKIES_FILE:
COOKIES_FILE = os.path.abspath(COOKIES_FILE)
# ******************************************************************************
# ************************ Environment & Dependencies **************************
# ***************************** Helper Functions *******************************
# ******************************************************************************
def check_version(binary: str) -> str:
@ -95,6 +94,7 @@ def check_version(binary: str) -> str:
print('{red}[X] Unable to find a working version of {cmd}, is it installed and in your $PATH?'.format(cmd=binary, **ANSI))
raise SystemExit(1)
def find_chrome_binary() -> Optional[str]:
"""find any installed chrome binaries in the default locations"""
# Precedence: Chromium, Chrome, Beta, Canary, Unstable, Dev
@ -119,6 +119,7 @@ def find_chrome_binary() -> Optional[str]:
print('{red}[X] Unable to find a working version of Chrome/Chromium, is it installed and in your $PATH?'.format(**ANSI))
raise SystemExit(1)
def find_chrome_data_dir() -> Optional[str]:
"""find any installed chrome user data directories in the default locations"""
# Precedence: Chromium, Chrome, Beta, Canary, Unstable, Dev
@ -142,6 +143,7 @@ def find_chrome_data_dir() -> Optional[str]:
return full_path
return None
def get_git_version() -> str:
"""get the git commit hash of the python code folder (aka code version)"""
try:
@ -151,6 +153,10 @@ def get_git_version() -> str:
return 'unknown'
# ******************************************************************************
# ************************ Environment & Dependencies **************************
# ******************************************************************************
try:
GIT_SHA = get_git_version()
@ -188,19 +194,33 @@ try:
print(' Alternatively, run this script with:')
print(' env PYTHONIOENCODING=UTF-8 ./archive.py export.html')
### Make sure curl is installed
USE_CURL = FETCH_FAVICON or SUBMIT_ARCHIVE_DOT_ORG
CURL_VERSION = USE_CURL and check_version(CURL_BINARY)
CURL_VERSION = None
if USE_CURL:
CURL_VERSION = check_version(CURL_BINARY)
### Make sure wget is installed and calculate version
USE_WGET = FETCH_WGET or FETCH_WARC
WGET_VERSION = USE_WGET and check_version(WGET_BINARY)
WGET_VERSION = None
if USE_WGET:
WGET_VERSION = check_version(WGET_BINARY)
WGET_USER_AGENT = WGET_USER_AGENT.format(
GIT_SHA=GIT_SHA[:9],
WGET_VERSION=WGET_VERSION or '',
)
### Make sure git is installed
GIT_VERSION = None
if FETCH_GIT:
GIT_VERSION = check_version(GIT_BINARY)
### Make sure youtube-dl is installed
YOUTUBEDL_VERSION = None
if FETCH_MEDIA:
check_version(YOUTUBEDL_BINARY)
### Make sure chrome is installed and calculate version
USE_CHROME = FETCH_PDF or FETCH_SCREENSHOT or FETCH_DOM
CHROME_VERSION = None
@ -214,13 +234,6 @@ try:
CHROME_USER_DATA_DIR = find_chrome_data_dir()
# print('[i] Using Chrome data dir: {}'.format(os.path.abspath(CHROME_USER_DATA_DIR)))
### Make sure git is installed
GIT_VERSION = FETCH_GIT and check_version(GIT_BINARY)
### Make sure youtube-dl is installed
YOUTUBEDL_VERSION = FETCH_MEDIA and check_version(YOUTUBEDL_BINARY)
### Chrome housekeeping options
CHROME_OPTIONS = {
'TIMEOUT': TIMEOUT,
'RESOLUTION': RESOLUTION,
@ -236,7 +249,6 @@ try:
# 'ignoreHTTPSErrors': not CHECK_SSL_VALIDITY,
# # 'executablePath': CHROME_BINARY,
# }
except KeyboardInterrupt:
raise SystemExit(1)

View file

@ -1,9 +1,10 @@
import os
import json
from itertools import chain
from datetime import datetime
from string import Template
from typing import List, Tuple
from typing import List, Tuple, Iterator, Optional
try:
from distutils.dir_util import copy_tree
@ -11,7 +12,7 @@ except ImportError:
print('[X] Missing "distutils" python package. To install it, run:')
print(' pip install distutils')
from schema import Link, ArchiveIndex
from schema import Link, ArchiveIndex, ArchiveResult
from config import (
OUTPUT_DIR,
TEMPLATES_DIR,
@ -22,11 +23,10 @@ from util import (
chmod_file,
urlencode,
derived_link_info,
wget_output_path,
ExtendedEncoder,
check_link_structure,
check_links_structure,
wget_output_path,
latest_output,
ExtendedEncoder,
)
from parse import parse_links
from links import validate_links
@ -47,7 +47,6 @@ def write_links_index(out_dir: str, links: List[Link], finished: bool=False) ->
"""create index.html file for a given list of links"""
log_indexing_process_started()
check_links_structure(links)
log_indexing_started(out_dir, 'index.json')
write_json_links_index(out_dir, links)
@ -63,20 +62,17 @@ def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[Li
existing_links: List[Link] = []
if out_dir:
existing_links = parse_json_links_index(out_dir)
check_links_structure(existing_links)
existing_links = list(parse_json_links_index(out_dir))
new_links: List[Link] = []
if import_path:
# parse and validate the import file
log_parsing_started(import_path)
raw_links, parser_name = parse_links(import_path)
new_links = validate_links(raw_links)
check_links_structure(new_links)
new_links = list(validate_links(raw_links))
# merge existing links in out_dir and new links
all_links = validate_links(existing_links + new_links)
check_links_structure(all_links)
all_links = list(validate_links(existing_links + new_links))
num_new_links = len(all_links) - len(existing_links)
if import_path and parser_name:
@ -88,7 +84,15 @@ def load_links_index(out_dir: str=OUTPUT_DIR, import_path: str=None) -> Tuple[Li
def write_json_links_index(out_dir: str, links: List[Link]) -> None:
"""write the json link index to a given path"""
check_links_structure(links)
assert isinstance(links, List), 'Links must be a list, not a generator.'
assert isinstance(links[0].history, dict)
assert isinstance(links[0].sources, list)
if links[0].history.get('title'):
assert isinstance(links[0].history['title'][0], ArchiveResult)
if links[0].sources:
assert isinstance(links[0].sources[0], str)
path = os.path.join(out_dir, 'index.json')
@ -98,7 +102,7 @@ def write_json_links_index(out_dir: str, links: List[Link]) -> None:
docs='https://github.com/pirate/ArchiveBox/wiki',
version=GIT_SHA,
num_links=len(links),
updated=str(datetime.now().timestamp()),
updated=datetime.now(),
links=links,
)
@ -110,23 +114,23 @@ def write_json_links_index(out_dir: str, links: List[Link]) -> None:
chmod_file(path)
def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> List[Link]:
def parse_json_links_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
"""parse a archive index json file and return the list of links"""
index_path = os.path.join(out_dir, 'index.json')
if os.path.exists(index_path):
with open(index_path, 'r', encoding='utf-8') as f:
links = json.load(f)['links']
check_links_structure(links)
return links
for link in links:
yield Link(**link)
return []
return ()
def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False) -> None:
"""write the html link index to a given path"""
check_links_structure(links)
path = os.path.join(out_dir, 'index.html')
copy_tree(os.path.join(TEMPLATES_DIR, 'static'), os.path.join(out_dir, 'static'))
@ -140,24 +144,22 @@ def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False
with open(os.path.join(TEMPLATES_DIR, 'index_row.html'), 'r', encoding='utf-8') as f:
link_row_html = f.read()
full_links_info = (derived_link_info(link) for link in links)
link_rows = '\n'.join(
Template(link_row_html).substitute(**{
**link,
**derived_link_info(link),
'title': (
link['title']
or (link['base_url'] if link['is_archived'] else TITLE_LOADING_MSG)
link.title
or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'favicon_url': (
os.path.join('archive', link['timestamp'], 'favicon.ico')
os.path.join('archive', link.timestamp, 'favicon.ico')
# if link['is_archived'] else 'data:image/gif;base64,R0lGODlhAQABAAD/ACwAAAAAAQABAAACADs='
),
'archive_url': urlencode(
wget_output_path(link) or 'index.html'
),
})
for link in full_links_info
for link in links
)
template_vars = {
@ -180,28 +182,33 @@ def write_html_links_index(out_dir: str, links: List[Link], finished: bool=False
def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
"""hack to in-place update one row's info in the generated index html"""
title = link['title'] or latest_output(link)['title']
successful = len(tuple(filter(None, latest_output(link).values())))
title = link.title or link.latest_outputs()['title']
successful = link.num_outputs
# Patch JSON index
changed = False
json_file_links = parse_json_links_index(out_dir)
patched_links = []
for saved_link in json_file_links:
if saved_link['url'] == link['url']:
saved_link['title'] = title
saved_link['history'] = link['history']
changed = True
break
if changed:
write_json_links_index(out_dir, json_file_links)
if saved_link.url == link.url:
patched_links.append(Link(**{
**saved_link._asdict(),
'title': title,
'history': link.history,
'updated': link.updated,
}))
else:
patched_links.append(saved_link)
write_json_links_index(out_dir, patched_links)
# Patch HTML index
html_path = os.path.join(out_dir, 'index.html')
html = open(html_path, 'r').read().split('\n')
for idx, line in enumerate(html):
if title and ('<span data-title-for="{}"'.format(link['url']) in line):
if title and ('<span data-title-for="{}"'.format(link.url) in line):
html[idx] = '<span>{}</span>'.format(title)
elif successful and ('<span data-number-for="{}"'.format(link['url']) in line):
elif successful and ('<span data-number-for="{}"'.format(link.url) in line):
html[idx] = '<span>{}</span>'.format(successful)
break
@ -212,7 +219,6 @@ def patch_links_index(link: Link, out_dir: str=OUTPUT_DIR) -> None:
### Individual link index
def write_link_index(out_dir: str, link: Link) -> None:
link['updated'] = str(datetime.now().timestamp())
write_json_link_index(out_dir, link)
write_html_link_index(out_dir, link)
@ -220,66 +226,58 @@ def write_link_index(out_dir: str, link: Link) -> None:
def write_json_link_index(out_dir: str, link: Link) -> None:
"""write a json file with some info about the link"""
check_link_structure(link)
path = os.path.join(out_dir, 'index.json')
with open(path, 'w', encoding='utf-8') as f:
json.dump(link, f, indent=4, cls=ExtendedEncoder)
json.dump(link._asdict(), f, indent=4, cls=ExtendedEncoder)
chmod_file(path)
def parse_json_link_index(out_dir: str) -> dict:
def parse_json_link_index(out_dir: str) -> Optional[Link]:
"""load the json link index from a given directory"""
existing_index = os.path.join(out_dir, 'index.json')
if os.path.exists(existing_index):
with open(existing_index, 'r', encoding='utf-8') as f:
link_json = json.load(f)
check_link_structure(link_json)
return link_json
return {}
return Link(**link_json)
return None
def load_json_link_index(out_dir: str, link: Link) -> Link:
"""check for an existing link archive in the given directory,
and load+merge it into the given link dict
"""
link = {
**parse_json_link_index(out_dir),
**link,
}
link.update({
'history': link.get('history') or {},
})
check_link_structure(link)
return link
existing_link = parse_json_link_index(out_dir)
existing_link = existing_link._asdict() if existing_link else {}
new_link = link._asdict()
return Link(**{**existing_link, **new_link})
def write_html_link_index(out_dir: str, link: Link) -> None:
check_link_structure(link)
with open(os.path.join(TEMPLATES_DIR, 'link_index.html'), 'r', encoding='utf-8') as f:
link_html = f.read()
path = os.path.join(out_dir, 'index.html')
link = derived_link_info(link)
with open(path, 'w', encoding='utf-8') as f:
f.write(Template(link_html).substitute({
**link,
**derived_link_info(link),
'title': (
link['title']
or (link['base_url'] if link['is_archived'] else TITLE_LOADING_MSG)
link.title
or (link.base_url if link.is_archived else TITLE_LOADING_MSG)
),
'archive_url': urlencode(
wget_output_path(link)
or (link['domain'] if link['is_archived'] else 'about:blank')
or (link.domain if link.is_archived else 'about:blank')
),
'extension': link['extension'] or 'html',
'tags': link['tags'].strip() or 'untagged',
'status': 'Archived' if link['is_archived'] else 'Not yet archived',
'status_color': 'success' if link['is_archived'] else 'danger',
'extension': link.extension or 'html',
'tags': link.tags or 'untagged',
'status': 'Archived' if link.is_archived else 'Not yet archived',
'status_color': 'success' if link.is_archived else 'danger',
}))
chmod_file(path)

View file

@ -11,7 +11,7 @@ Link {
sources: [str],
history: {
pdf: [
{start_ts, end_ts, duration, cmd, pwd, status, output},
{start_ts, end_ts, cmd, pwd, cmd_version, status, output},
...
],
...
@ -19,41 +19,36 @@ Link {
}
"""
from typing import List, Iterable
from typing import Iterable
from collections import OrderedDict
from schema import Link
from util import (
scheme,
fuzzy_url,
merge_links,
check_link_structure,
check_links_structure,
htmldecode,
hashurl,
)
def validate_links(links: Iterable[Link]) -> List[Link]:
check_links_structure(links)
def validate_links(links: Iterable[Link]) -> Iterable[Link]:
links = archivable_links(links) # remove chrome://, about:, mailto: etc.
links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
links = sorted_links(links) # deterministically sort the links based on timstamp, url
links = uniquefied_links(links) # merge/dedupe duplicate timestamps & urls
if not links:
print('[X] No links found :(')
raise SystemExit(1)
for link in links:
link['title'] = htmldecode(link['title'].strip()) if link['title'] else None
check_link_structure(link)
return list(links)
return links
def archivable_links(links: Iterable[Link]) -> Iterable[Link]:
"""remove chrome://, about:// or other schemed links that cant be archived"""
return (
link
for link in links
if any(link['url'].lower().startswith(s) for s in ('http://', 'https://', 'ftp://'))
if scheme(link.url) in ('http', 'https', 'ftp')
)
@ -64,38 +59,37 @@ def uniquefied_links(sorted_links: Iterable[Link]) -> Iterable[Link]:
unique_urls: OrderedDict[str, Link] = OrderedDict()
lower = lambda url: url.lower().strip()
without_www = lambda url: url.replace('://www.', '://', 1)
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
for link in sorted_links:
fuzzy_url = without_www(without_trailing_slash(lower(link['url'])))
if fuzzy_url in unique_urls:
fuzzy = fuzzy_url(link.url)
if fuzzy in unique_urls:
# merge with any other links that share the same url
link = merge_links(unique_urls[fuzzy_url], link)
unique_urls[fuzzy_url] = link
link = merge_links(unique_urls[fuzzy], link)
unique_urls[fuzzy] = link
unique_timestamps: OrderedDict[str, Link] = OrderedDict()
for link in unique_urls.values():
link['timestamp'] = lowest_uniq_timestamp(unique_timestamps, link['timestamp'])
unique_timestamps[link['timestamp']] = link
new_link = Link(**{
**link._asdict(),
'timestamp': lowest_uniq_timestamp(unique_timestamps, link.timestamp),
})
unique_timestamps[new_link.timestamp] = new_link
return unique_timestamps.values()
def sorted_links(links: Iterable[Link]) -> Iterable[Link]:
sort_func = lambda link: (link['timestamp'].split('.', 1)[0], link['url'])
sort_func = lambda link: (link.timestamp.split('.', 1)[0], link.url)
return sorted(links, key=sort_func, reverse=True)
def links_after_timestamp(links: Iterable[Link], timestamp: str=None) -> Iterable[Link]:
if not timestamp:
def links_after_timestamp(links: Iterable[Link], resume: float=None) -> Iterable[Link]:
if not resume:
yield from links
return
for link in links:
try:
if float(link['timestamp']) <= float(timestamp):
if float(link.timestamp) <= resume:
yield link
except (ValueError, TypeError):
print('Resume value and all timestamp values must be valid numbers.')

View file

@ -1,6 +1,7 @@
import sys
from datetime import datetime
from typing import Optional
from schema import Link, ArchiveResult, RuntimeStats
from config import ANSI, REPO_DIR, OUTPUT_DIR
@ -66,7 +67,7 @@ def log_indexing_finished(out_dir: str, out_file: str):
### Archiving Stage
def log_archiving_started(num_links: int, resume: float):
def log_archiving_started(num_links: int, resume: Optional[float]):
start_ts = datetime.now()
_LAST_RUN_STATS.archiving_start_ts = start_ts
if resume:
@ -132,10 +133,10 @@ def log_link_archiving_started(link_dir: str, link: Link, is_new: bool):
symbol_color=ANSI['green' if is_new else 'black'],
symbol='+' if is_new else '*',
now=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
title=link['title'] or link['url'],
title=link.title or link.base_url,
**ANSI,
))
print(' {blue}{url}{reset}'.format(url=link['url'], **ANSI))
print(' {blue}{url}{reset}'.format(url=link.url, **ANSI))
print(' {} {}'.format(
'>' if is_new else '',
pretty_path(link_dir),

View file

@ -26,6 +26,7 @@ import xml.etree.ElementTree as etree
from config import TIMEOUT
from util import (
htmldecode,
str_between,
URL_REGEX,
check_url_parsing_invariants,
@ -91,13 +92,13 @@ def parse_pocket_html_export(html_file: IO[str]) -> Iterable[Link]:
tags = match.group(3)
title = match.group(4).replace(' — Readability', '').replace('http://www.readability.com/read?url=', '')
yield {
'url': url,
'timestamp': str(time.timestamp()),
'title': title or None,
'tags': tags or '',
'sources': [html_file.name],
}
yield Link(
url=url,
timestamp=str(time.timestamp()),
title=title or None,
tags=tags or '',
sources=[html_file.name],
)
def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
@ -137,19 +138,19 @@ def parse_json_export(json_file: IO[str]) -> Iterable[Link]:
# Parse the title
title = None
if link.get('title'):
title = link['title'].strip() or None
title = link['title'].strip()
elif link.get('description'):
title = link['description'].replace(' — Readability', '').strip() or None
title = link['description'].replace(' — Readability', '').strip()
elif link.get('name'):
title = link['name'].strip() or None
title = link['name'].strip()
yield {
'url': url,
'timestamp': ts_str,
'title': title,
'tags': link.get('tags') or '',
'sources': [json_file.name],
}
yield Link(
url=url,
timestamp=ts_str,
title=htmldecode(title) or None,
tags=link.get('tags') or '',
sources=[json_file.name],
)
def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@ -178,15 +179,15 @@ def parse_rss_export(rss_file: IO[str]) -> Iterable[Link]:
url = str_between(get_row('link'), '<link>', '</link>')
ts_str = str_between(get_row('pubDate'), '<pubDate>', '</pubDate>')
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %z")
title = str_between(get_row('title'), '<![CDATA[', ']]').strip() or None
title = str_between(get_row('title'), '<![CDATA[', ']]').strip()
yield {
'url': url,
'timestamp': str(time.timestamp()),
'title': title,
'tags': '',
'sources': [rss_file.name],
}
yield Link(
url=url,
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags='',
sources=[rss_file.name],
)
def parse_shaarli_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@ -217,13 +218,13 @@ def parse_shaarli_rss_export(rss_file: IO[str]) -> Iterable[Link]:
ts_str = str_between(get_row('published'), '<published>', '</published>')
time = datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S%z")
yield {
'url': url,
'timestamp': str(time.timestamp()),
'title': title or None,
'tags': '',
'sources': [rss_file.name],
}
yield Link(
url=url,
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags='',
sources=[rss_file.name],
)
def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
@ -239,14 +240,15 @@ def parse_netscape_html_export(html_file: IO[str]) -> Iterable[Link]:
if match:
url = match.group(1)
time = datetime.fromtimestamp(float(match.group(2)))
title = match.group(3).strip()
yield {
'url': url,
'timestamp': str(time.timestamp()),
'title': match.group(3).strip() or None,
'tags': '',
'sources': [html_file.name],
}
yield Link(
url=url,
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags='',
sources=[html_file.name],
)
def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@ -271,13 +273,13 @@ def parse_pinboard_rss_export(rss_file: IO[str]) -> Iterable[Link]:
else:
time = datetime.now()
yield {
'url': url,
'timestamp': str(time.timestamp()),
'title': title or None,
'tags': tags or '',
'sources': [rss_file.name],
}
yield Link(
url=url,
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags=tags or '',
sources=[rss_file.name],
)
def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
@ -292,13 +294,13 @@ def parse_medium_rss_export(rss_file: IO[str]) -> Iterable[Link]:
ts_str = item.find("pubDate").text
time = datetime.strptime(ts_str, "%a, %d %b %Y %H:%M:%S %Z")
yield {
'url': url,
'timestamp': str(time.timestamp()),
'title': title or None,
'tags': '',
'sources': [rss_file.name],
}
yield Link(
url=url,
timestamp=str(time.timestamp()),
title=htmldecode(title) or None,
tags='',
sources=[rss_file.name],
)
def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
@ -308,10 +310,10 @@ def parse_plain_text_export(text_file: IO[str]) -> Iterable[Link]:
for line in text_file.readlines():
urls = re.findall(URL_REGEX, line) if line.strip() else ()
for url in urls:
yield {
'url': url,
'timestamp': str(datetime.now().timestamp()),
'title': None,
'tags': '',
'sources': [text_file.name],
}
yield Link(
url=url,
timestamp=str(datetime.now().timestamp()),
title=None,
tags='',
sources=[text_file.name],
)

View file

@ -1,11 +1,223 @@
import os
from datetime import datetime
from typing import List, Dict, Any, Optional, Union, NamedTuple
from recordclass import RecordClass
from typing import List, Dict, Any, Optional, Union
Link = Dict[str, Any]
from dataclasses import dataclass, asdict, field
class ArchiveIndex(NamedTuple):
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints
LinkDict = Dict[str, Any]
@dataclass(frozen=True)
class ArchiveResult:
cmd: List[str]
pwd: Optional[str]
cmd_version: Optional[str]
output: Union[str, Exception, None]
status: str
start_ts: datetime
end_ts: datetime
def _asdict(self):
return asdict(self)
@property
def duration(self) -> int:
return (self.end_ts - self.start_ts).seconds
@dataclass(frozen=True)
class Link:
timestamp: str
url: str
title: Optional[str]
tags: Optional[str]
sources: List[str]
history: Dict[str, List[ArchiveResult]] = field(default_factory=lambda: {})
updated: Optional[str] = None
def __hash__(self):
return self.urlhash
def __eq__(self, other):
if not isinstance(other, Link):
return NotImplemented
return self.urlhash == other.urlhash
def __gt__(self, other):
if not isinstance(other, Link):
return NotImplemented
if not self.timestamp or not other.timestamp:
return
return float(self.timestamp) > float(other.timestamp)
def _asdict(self, extended=False):
info = {
'url': self.url,
'title': self.title or None,
'timestamp': self.timestamp,
'updated': self.updated or None,
'tags': self.tags or None,
'sources': self.sources or [],
'history': self.history or {},
}
if extended:
info.update({
'link_dir': self.link_dir,
'archive_path': self.archive_path,
'bookmarked_date': self.bookmarked_date,
'updated_date': self.updated_date,
'domain': self.domain,
'path': self.path,
'basename': self.basename,
'extension': self.extension,
'base_url': self.base_url,
'is_static': self.is_static,
'is_archived': self.is_archived,
'num_outputs': self.num_outputs,
})
return info
@property
def link_dir(self) -> str:
from config import ARCHIVE_DIR
return os.path.join(ARCHIVE_DIR, self.timestamp)
@property
def archive_path(self) -> str:
from config import ARCHIVE_DIR_NAME
return '{}/{}'.format(ARCHIVE_DIR_NAME, self.timestamp)
### URL Helpers
@property
def urlhash(self):
from util import hashurl
return hashurl(self.url)
@property
def extension(self) -> str:
from util import extension
return extension(self.url)
@property
def domain(self) -> str:
from util import domain
return domain(self.url)
@property
def path(self) -> str:
from util import path
return path(self.url)
@property
def basename(self) -> str:
from util import basename
return basename(self.url)
@property
def base_url(self) -> str:
from util import base_url
return base_url(self.url)
### Pretty Printing Helpers
@property
def bookmarked_date(self) -> Optional[str]:
from util import ts_to_date
return ts_to_date(self.timestamp) if self.timestamp else None
@property
def updated_date(self) -> Optional[str]:
from util import ts_to_date
return ts_to_date(self.updated) if self.updated else None
### Archive Status Helpers
@property
def num_outputs(self) -> int:
return len(tuple(filter(None, self.latest_outputs().values())))
@property
def is_static(self) -> bool:
from util import is_static_file
return is_static_file(self.url)
@property
def is_archived(self) -> bool:
from config import ARCHIVE_DIR
from util import domain
return os.path.exists(os.path.join(
ARCHIVE_DIR,
self.timestamp,
domain(self.url),
))
def latest_outputs(self, status: str=None) -> Dict[str, Optional[str]]:
"""get the latest output that each archive method produced for link"""
latest = {
'title': None,
'favicon': None,
'wget': None,
'warc': None,
'pdf': None,
'screenshot': None,
'dom': None,
'git': None,
'media': None,
'archive_org': None,
}
for archive_method in latest.keys():
# get most recent succesful result in history for each archive method
history = self.history.get(archive_method) or []
history = filter(lambda result: result.output, reversed(history))
if status is not None:
history = filter(lambda result: result.status == status, history)
history = list(history)
if history:
latest[archive_method] = history[0].output
return latest
def canonical_outputs(self) -> Dict[str, Optional[str]]:
from util import wget_output_path
canonical = {
'index_url': 'index.html',
'favicon_url': 'favicon.ico',
'google_favicon_url': 'https://www.google.com/s2/favicons?domain={}'.format(self.domain),
'archive_url': wget_output_path(self),
'warc_url': 'warc',
'pdf_url': 'output.pdf',
'screenshot_url': 'screenshot.png',
'dom_url': 'output.html',
'archive_org_url': 'https://web.archive.org/web/{}'.format(self.base_url),
'git_url': 'git',
'media_url': 'media',
}
if self.is_static:
# static binary files like PDF and images are handled slightly differently.
# they're just downloaded once and aren't archived separately multiple times,
# so the wget, screenshot, & pdf urls should all point to the same file
static_url = wget_output_path(self)
canonical.update({
'title': self.basename,
'archive_url': static_url,
'pdf_url': static_url,
'screenshot_url': static_url,
'dom_url': static_url,
})
return canonical
@dataclass(frozen=True)
class ArchiveIndex:
info: str
version: str
source: str
@ -14,33 +226,11 @@ class ArchiveIndex(NamedTuple):
updated: str
links: List[Link]
class ArchiveResult(NamedTuple):
cmd: List[str]
pwd: Optional[str]
cmd_version: Optional[str]
output: Union[str, Exception, None]
status: str
start_ts: datetime
end_ts: datetime
duration: int
def _asdict(self):
return asdict(self)
class ArchiveError(Exception):
def __init__(self, message, hints=None):
super().__init__(message)
self.hints = hints
class LinkDict(NamedTuple):
timestamp: str
url: str
title: Optional[str]
tags: str
sources: List[str]
history: Dict[str, ArchiveResult]
class RuntimeStats(RecordClass):
@dataclass
class RuntimeStats:
skipped: int
succeeded: int
failed: int

View file

@ -1,14 +1,14 @@
<tr>
<td title="$timestamp">$bookmarked_date</td>
<td style="text-align:left">
<a href="$link_dir/$index_url"><img src="$favicon_url" class="link-favicon" decoding="async"></a>
<a href="$link_dir/$archive_url" title="$title">
<a href="$archive_path/$index_url"><img src="$favicon_url" class="link-favicon" decoding="async"></a>
<a href="$archive_path/$archive_url" title="$title">
<span data-title-for="$url" data-archived="$is_archived">$title</span>
<small>$tags</small>
</a>
</td>
<td>
<a href="$link_dir/$index_url">📄
<a href="$archive_path/$index_url">📄
<span data-number-for="$url" title="Fetching any missing files...">$num_outputs <img src="static/spinner.gif" class="files-spinner" decoding="async"/></span>
</a>
</td>

View file

@ -4,9 +4,8 @@ import sys
import time
from json import JSONEncoder
from typing import List, Dict, Optional, Iterable
from typing import List, Optional, Iterable
from hashlib import sha256
from urllib.request import Request, urlopen
from urllib.parse import urlparse, quote, unquote
from html import escape, unescape
@ -21,17 +20,17 @@ from subprocess import (
CalledProcessError,
)
from schema import Link
from base32_crockford import encode as base32_encode
from schema import Link, LinkDict, ArchiveResult
from config import (
ANSI,
TERM_WIDTH,
SOURCES_DIR,
ARCHIVE_DIR,
OUTPUT_PERMISSIONS,
TIMEOUT,
SHOW_PROGRESS,
FETCH_TITLE,
ARCHIVE_DIR_NAME,
CHECK_SSL_VALIDITY,
WGET_USER_AGENT,
CHROME_OPTIONS,
@ -43,7 +42,7 @@ from logs import pretty_path
# All of these are (str) -> str
# shortcuts to: https://docs.python.org/3/library/urllib.parse.html#url-parsing
scheme = lambda url: urlparse(url).scheme
scheme = lambda url: urlparse(url).scheme.lower()
without_scheme = lambda url: urlparse(url)._replace(scheme='').geturl().strip('//')
without_query = lambda url: urlparse(url)._replace(query='').geturl().strip('//')
without_fragment = lambda url: urlparse(url)._replace(fragment='').geturl().strip('//')
@ -56,11 +55,33 @@ fragment = lambda url: urlparse(url).fragment
extension = lambda url: basename(url).rsplit('.', 1)[-1].lower() if '.' in basename(url) else ''
base_url = lambda url: without_scheme(url) # uniq base url used to dedupe links
short_ts = lambda ts: ts.split('.')[0]
urlencode = lambda s: quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: unquote(s)
htmlencode = lambda s: escape(s, quote=True)
htmldecode = lambda s: unescape(s)
without_www = lambda url: url.replace('://www.', '://', 1)
without_trailing_slash = lambda url: url[:-1] if url[-1] == '/' else url.replace('/?', '?')
fuzzy_url = lambda url: without_trailing_slash(without_www(without_scheme(url.lower())))
short_ts = lambda ts: (
str(ts.timestamp()).split('.')[0]
if isinstance(ts, datetime) else
str(ts).split('.')[0]
)
ts_to_date = lambda ts: (
ts.strftime('%Y-%m-%d %H:%M')
if isinstance(ts, datetime) else
datetime.fromtimestamp(float(ts)).strftime('%Y-%m-%d %H:%M')
)
ts_to_iso = lambda ts: (
ts.isoformat()
if isinstance(ts, datetime) else
datetime.fromtimestamp(float(ts)).isoformat()
)
urlencode = lambda s: s and quote(s, encoding='utf-8', errors='replace')
urldecode = lambda s: s and unquote(s)
htmlencode = lambda s: s and escape(s, quote=True)
htmldecode = lambda s: s and unescape(s)
hashurl = lambda url: base32_encode(int(sha256(base_url(url).encode('utf-8')).hexdigest(), 16))[:20]
URL_REGEX = re.compile(
r'http[s]?://' # start matching from allowed schemes
@ -80,7 +101,8 @@ STATICFILE_EXTENSIONS = {
# that can be downloaded as-is, not html pages that need to be rendered
'gif', 'jpeg', 'jpg', 'png', 'tif', 'tiff', 'wbmp', 'ico', 'jng', 'bmp',
'svg', 'svgz', 'webp', 'ps', 'eps', 'ai',
'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v', 'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8'
'mp3', 'mp4', 'm4a', 'mpeg', 'mpg', 'mkv', 'mov', 'webm', 'm4v',
'flv', 'wmv', 'avi', 'ogg', 'ts', 'm3u8'
'pdf', 'txt', 'rtf', 'rtfd', 'doc', 'docx', 'ppt', 'pptx', 'xls', 'xlsx',
'atom', 'rss', 'css', 'js', 'json',
'dmg', 'iso', 'img',
@ -100,7 +122,7 @@ STATICFILE_EXTENSIONS = {
### Checks & Tests
def check_link_structure(link: Link) -> None:
def check_link_structure(link: LinkDict) -> None:
"""basic sanity check invariants to make sure the data is valid"""
assert isinstance(link, dict)
assert isinstance(link.get('url'), str)
@ -112,7 +134,7 @@ def check_link_structure(link: Link) -> None:
assert isinstance(key, str)
assert isinstance(val, list), 'history must be a Dict[str, List], got: {}'.format(link['history'])
def check_links_structure(links: Iterable[Link]) -> None:
def check_links_structure(links: Iterable[LinkDict]) -> None:
"""basic sanity check invariants to make sure the data is valid"""
assert isinstance(links, list)
if links:
@ -213,7 +235,7 @@ def fetch_page_title(url: str, timeout: int=10, progress: bool=SHOW_PROGRESS) ->
html = download_url(url, timeout=timeout)
match = re.search(HTML_TITLE_REGEX, html)
return match.group(1).strip() if match else None
return htmldecode(match.group(1).strip()) if match else None
except Exception as err: # noqa
# print('[!] Failed to fetch title because of {}: {}'.format(
# err.__class__.__name__,
@ -228,8 +250,8 @@ def wget_output_path(link: Link) -> Optional[str]:
See docs on wget --adjust-extension (-E)
"""
if is_static_file(link['url']):
return without_scheme(without_fragment(link['url']))
if is_static_file(link.url):
return without_scheme(without_fragment(link.url))
# Wget downloads can save in a number of different ways depending on the url:
# https://example.com
@ -262,11 +284,10 @@ def wget_output_path(link: Link) -> Optional[str]:
# and there's no way to get the computed output path from wget
# in order to avoid having to reverse-engineer how they calculate it,
# we just look in the output folder read the filename wget used from the filesystem
link_dir = os.path.join(ARCHIVE_DIR, link['timestamp'])
full_path = without_fragment(without_query(path(link['url']))).strip('/')
full_path = without_fragment(without_query(path(link.url))).strip('/')
search_dir = os.path.join(
link_dir,
domain(link['url']),
link.link_dir,
domain(link.url),
full_path,
)
@ -278,13 +299,13 @@ def wget_output_path(link: Link) -> Optional[str]:
if re.search(".+\\.[Hh][Tt][Mm][Ll]?$", f, re.I | re.M)
]
if html_files:
path_from_link_dir = search_dir.split(link_dir)[-1].strip('/')
path_from_link_dir = search_dir.split(link.link_dir)[-1].strip('/')
return os.path.join(path_from_link_dir, html_files[0])
# Move up one directory level
search_dir = search_dir.rsplit('/', 1)[0]
if search_dir == link_dir:
if search_dir == link.link_dir:
break
return None
@ -314,19 +335,20 @@ def merge_links(a: Link, b: Link) -> Link:
"""deterministially merge two links, favoring longer field values over shorter,
and "cleaner" values over worse ones.
"""
a, b = a._asdict(), b._asdict()
longer = lambda key: (a[key] if len(a[key]) > len(b[key]) else b[key]) if (a[key] and b[key]) else (a[key] or b[key])
earlier = lambda key: a[key] if a[key] < b[key] else b[key]
url = longer('url')
longest_title = longer('title')
cleanest_title = a['title'] if '://' not in (a['title'] or '') else b['title']
return {
'url': url,
'timestamp': earlier('timestamp'),
'title': longest_title if '://' not in (longest_title or '') else cleanest_title,
'tags': longer('tags'),
'sources': list(set(a.get('sources', []) + b.get('sources', []))),
}
return Link(
url=url,
timestamp=earlier('timestamp'),
title=longest_title if '://' not in (longest_title or '') else cleanest_title,
tags=longer('tags'),
sources=list(set(a.get('sources', []) + b.get('sources', []))),
)
def is_static_file(url: str) -> bool:
"""Certain URLs just point to a single static file, and
@ -339,85 +361,11 @@ def is_static_file(url: str) -> bool:
def derived_link_info(link: Link) -> dict:
"""extend link info with the archive urls and other derived data"""
url = link['url']
info = link._asdict(extended=True)
info.update(link.canonical_outputs())
to_date_str = lambda ts: datetime.fromtimestamp(float(ts)).strftime('%Y-%m-%d %H:%M')
return info
extended_info = {
**link,
'link_dir': '{}/{}'.format(ARCHIVE_DIR_NAME, link['timestamp']),
'bookmarked_date': to_date_str(link['timestamp']),
'updated_date': to_date_str(link['updated']) if 'updated' in link else None,
'domain': domain(url),
'path': path(url),
'basename': basename(url),
'extension': extension(url),
'base_url': base_url(url),
'is_static': is_static_file(url),
'is_archived': os.path.exists(os.path.join(
ARCHIVE_DIR,
link['timestamp'],
domain(url),
)),
'num_outputs': len([entry for entry in latest_output(link).values() if entry]),
}
# Archive Method Output URLs
extended_info.update({
'index_url': 'index.html',
'favicon_url': 'favicon.ico',
'google_favicon_url': 'https://www.google.com/s2/favicons?domain={domain}'.format(**extended_info),
'archive_url': wget_output_path(link),
'warc_url': 'warc',
'pdf_url': 'output.pdf',
'screenshot_url': 'screenshot.png',
'dom_url': 'output.html',
'archive_org_url': 'https://web.archive.org/web/{base_url}'.format(**extended_info),
'git_url': 'git',
'media_url': 'media',
})
# static binary files like PDF and images are handled slightly differently.
# they're just downloaded once and aren't archived separately multiple times,
# so the wget, screenshot, & pdf urls should all point to the same file
if is_static_file(url):
extended_info.update({
'title': basename(url),
'archive_url': base_url(url),
'pdf_url': base_url(url),
'screenshot_url': base_url(url),
'dom_url': base_url(url),
})
return extended_info
def latest_output(link: Link, status: str=None) -> Dict[str, Optional[str]]:
"""get the latest output that each archive method produced for link"""
latest = {
'title': None,
'favicon': None,
'wget': None,
'warc': None,
'pdf': None,
'screenshot': None,
'dom': None,
'git': None,
'media': None,
'archive_org': None,
}
for archive_method in latest.keys():
# get most recent succesful result in history for each archive method
history = link.get('history', {}).get(archive_method) or []
history = filter(lambda result: result['output'], reversed(history))
if status is not None:
history = filter(lambda result: result['status'] == status, history)
history = list(history)
if history:
latest[archive_method] = history[0]['output']
return latest
### Python / System Helpers
@ -466,21 +414,13 @@ class TimedProgress:
self.p = Process(target=progress_bar, args=(seconds, prefix))
self.p.start()
self.stats = {
'start_ts': datetime.now(),
'end_ts': None,
'duration': None,
}
self.stats = {'start_ts': datetime.now(), 'end_ts': None}
def end(self):
"""immediately end progress, clear the progressbar line, and save end_ts"""
end_ts = datetime.now()
self.stats.update({
'end_ts': end_ts,
'duration': (end_ts - self.stats['start_ts']).seconds,
})
self.stats['end_ts'] = end_ts
if SHOW_PROGRESS:
# protect from double termination
#if p is None or not hasattr(p, 'kill'):