mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-14 16:37:17 +00:00
121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
__package__ = 'archivebox.extractors'
|
|
|
|
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Tuple
|
|
from collections import defaultdict
|
|
|
|
from ..index.schema import Link, ArchiveResult, ArchiveOutput, ArchiveError
|
|
from ..system import run, chmod_file
|
|
from ..util import (
|
|
enforce_types,
|
|
is_static_file,
|
|
dedupe,
|
|
)
|
|
from ..config import (
|
|
TIMEOUT,
|
|
CURL_ARGS,
|
|
CURL_EXTRA_ARGS,
|
|
CHECK_SSL_VALIDITY,
|
|
SAVE_ARCHIVE_DOT_ORG,
|
|
CURL_BINARY,
|
|
CURL_VERSION,
|
|
CURL_USER_AGENT,
|
|
)
|
|
from ..logging_util import TimedProgress
|
|
|
|
def get_output_path():
|
|
return 'archive.org.txt'
|
|
|
|
|
|
@enforce_types
|
|
def should_save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, overwrite: Optional[bool]=False) -> bool:
|
|
if is_static_file(link.url):
|
|
return False
|
|
|
|
out_dir = out_dir or Path(link.link_dir)
|
|
if not overwrite and (out_dir / get_output_path()).exists():
|
|
# if open(path, 'r', encoding='utf-8').read().strip() != 'None':
|
|
return False
|
|
|
|
return SAVE_ARCHIVE_DOT_ORG
|
|
|
|
@enforce_types
|
|
def save_archive_dot_org(link: Link, out_dir: Optional[Path]=None, timeout: int=TIMEOUT) -> ArchiveResult:
|
|
"""submit site to archive.org for archiving via their service, save returned archive url"""
|
|
|
|
out_dir = out_dir or Path(link.link_dir)
|
|
output: ArchiveOutput = get_output_path()
|
|
archive_org_url = None
|
|
submit_url = 'https://web.archive.org/save/{}'.format(link.url)
|
|
# later options take precedence
|
|
options = [
|
|
*CURL_ARGS,
|
|
*CURL_EXTRA_ARGS,
|
|
'--head',
|
|
'--max-time', str(timeout),
|
|
*(['--user-agent', '{}'.format(CURL_USER_AGENT)] if CURL_USER_AGENT else []),
|
|
*([] if CHECK_SSL_VALIDITY else ['--insecure']),
|
|
]
|
|
cmd = [
|
|
CURL_BINARY,
|
|
*dedupe(options),
|
|
submit_url,
|
|
]
|
|
status = 'succeeded'
|
|
timer = TimedProgress(timeout, prefix=' ')
|
|
try:
|
|
result = run(cmd, cwd=str(out_dir), timeout=timeout)
|
|
content_location, errors = parse_archive_dot_org_response(result.stdout)
|
|
if content_location:
|
|
archive_org_url = content_location[0]
|
|
elif len(errors) == 1 and 'RobotAccessControlException' in errors[0]:
|
|
archive_org_url = None
|
|
# raise ArchiveError('Archive.org denied by {}/robots.txt'.format(domain(link.url)))
|
|
elif errors:
|
|
raise ArchiveError(', '.join(errors))
|
|
else:
|
|
raise ArchiveError('Failed to find "content-location" URL header in Archive.org response.')
|
|
except Exception as err:
|
|
status = 'failed'
|
|
output = err
|
|
finally:
|
|
timer.end()
|
|
|
|
if output and not isinstance(output, Exception):
|
|
# instead of writing None when archive.org rejects the url write the
|
|
# url to resubmit it to archive.org. This is so when the user visits
|
|
# the URL in person, it will attempt to re-archive it, and it'll show the
|
|
# nicer error message explaining why the url was rejected if it fails.
|
|
archive_org_url = archive_org_url or submit_url
|
|
with open(str(out_dir / output), 'w', encoding='utf-8') as f:
|
|
f.write(archive_org_url)
|
|
chmod_file(str(out_dir / output), cwd=str(out_dir))
|
|
output = archive_org_url
|
|
|
|
return ArchiveResult(
|
|
cmd=cmd,
|
|
pwd=str(out_dir),
|
|
cmd_version=CURL_VERSION,
|
|
output=output,
|
|
status=status,
|
|
**timer.stats,
|
|
)
|
|
|
|
@enforce_types
|
|
def parse_archive_dot_org_response(response: bytes) -> Tuple[List[str], List[str]]:
|
|
# Parse archive.org response headers
|
|
headers: Dict[str, List[str]] = defaultdict(list)
|
|
|
|
# lowercase all the header names and store in dict
|
|
for header in response.splitlines():
|
|
if b':' not in header or not header.strip():
|
|
continue
|
|
name, val = header.decode().split(':', 1)
|
|
headers[name.lower().strip()].append(val.strip())
|
|
|
|
# Get successful archive url in "content-location" header or any errors
|
|
content_location = headers.get('content-location', headers['location'])
|
|
errors = headers['x-archive-wayback-runtime-error']
|
|
return content_location, errors
|
|
|