mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-10 14:44:18 +00:00
first attempt to migrate to Pathlib
This commit is contained in:
parent
2767155e59
commit
594d9e49ce
7 changed files with 89 additions and 85 deletions
|
@ -222,15 +222,15 @@ DERIVED_CONFIG_DEFAULTS: ConfigDefaultDict = {
|
|||
'USER': {'default': lambda c: getpass.getuser() or os.getlogin()},
|
||||
'ANSI': {'default': lambda c: DEFAULT_CLI_COLORS if c['USE_COLOR'] else {k: '' for k in DEFAULT_CLI_COLORS.keys()}},
|
||||
|
||||
'REPO_DIR': {'default': lambda c: os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..'))},
|
||||
'PYTHON_DIR': {'default': lambda c: os.path.join(c['REPO_DIR'], PYTHON_DIR_NAME)},
|
||||
'TEMPLATES_DIR': {'default': lambda c: os.path.join(c['PYTHON_DIR'], TEMPLATES_DIR_NAME, 'legacy')},
|
||||
'REPO_DIR': {'default': lambda c: Path(os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..')))},
|
||||
'PYTHON_DIR': {'default': lambda c: Path.joinpath(Path(c['REPO_DIR']), PYTHON_DIR_NAME)},
|
||||
'TEMPLATES_DIR': {'default': lambda c: Path.joinpath(c['PYTHON_DIR'], TEMPLATES_DIR_NAME, 'legacy')},
|
||||
|
||||
'OUTPUT_DIR': {'default': lambda c: os.path.abspath(os.path.expanduser(c['OUTPUT_DIR'])) if c['OUTPUT_DIR'] else os.path.abspath(os.curdir)},
|
||||
'ARCHIVE_DIR': {'default': lambda c: os.path.join(c['OUTPUT_DIR'], ARCHIVE_DIR_NAME)},
|
||||
'SOURCES_DIR': {'default': lambda c: os.path.join(c['OUTPUT_DIR'], SOURCES_DIR_NAME)},
|
||||
'LOGS_DIR': {'default': lambda c: os.path.join(c['OUTPUT_DIR'], LOGS_DIR_NAME)},
|
||||
'CONFIG_FILE': {'default': lambda c: os.path.abspath(os.path.expanduser(c['CONFIG_FILE'])) if c['CONFIG_FILE'] else os.path.join(c['OUTPUT_DIR'], CONFIG_FILENAME)},
|
||||
'OUTPUT_DIR': {'default': lambda c: Path(os.path.abspath(os.path.expanduser(c['OUTPUT_DIR'])) if c['OUTPUT_DIR'] else os.path.abspath(os.curdir))},
|
||||
'ARCHIVE_DIR': {'default': lambda c: Path.joinpath(c['OUTPUT_DIR'], ARCHIVE_DIR_NAME)},
|
||||
'SOURCES_DIR': {'default': lambda c: Path.joinpath(c['OUTPUT_DIR'], SOURCES_DIR_NAME)},
|
||||
'LOGS_DIR': {'default': lambda c: Path.joinpath(c['OUTPUT_DIR'], LOGS_DIR_NAME)},
|
||||
'CONFIG_FILE': {'default': lambda c: os.path.abspath(os.path.expanduser(c['CONFIG_FILE'])) if c['CONFIG_FILE'] else Path.joinpath(c['OUTPUT_DIR'], CONFIG_FILENAME)},
|
||||
'COOKIES_FILE': {'default': lambda c: c['COOKIES_FILE'] and os.path.abspath(os.path.expanduser(c['COOKIES_FILE']))},
|
||||
'CHROME_USER_DATA_DIR': {'default': lambda c: find_chrome_data_dir() if c['CHROME_USER_DATA_DIR'] is None else (os.path.abspath(os.path.expanduser(c['CHROME_USER_DATA_DIR'])) or None)},
|
||||
'URL_BLACKLIST_PTN': {'default': lambda c: c['URL_BLACKLIST'] and re.compile(c['URL_BLACKLIST'] or '', re.IGNORECASE | re.UNICODE | re.MULTILINE)},
|
||||
|
@ -348,7 +348,7 @@ def load_config_file(out_dir: str=None) -> Optional[Dict[str, str]]:
|
|||
"""load the ini-formatted config file from OUTPUT_DIR/Archivebox.conf"""
|
||||
|
||||
out_dir = out_dir or os.path.abspath(os.getenv('OUTPUT_DIR', '.'))
|
||||
config_path = os.path.join(out_dir, CONFIG_FILENAME)
|
||||
config_path = Path.joinpath(Path(out_dir), CONFIG_FILENAME)
|
||||
if os.path.exists(config_path):
|
||||
config_file = ConfigParser()
|
||||
config_file.optionxform = str
|
||||
|
@ -371,7 +371,7 @@ def write_config_file(config: Dict[str, str], out_dir: str=None) -> ConfigDict:
|
|||
from ..system import atomic_write
|
||||
|
||||
out_dir = out_dir or os.path.abspath(os.getenv('OUTPUT_DIR', '.'))
|
||||
config_path = os.path.join(out_dir, CONFIG_FILENAME)
|
||||
config_path = Path.joinpath(out_dir, CONFIG_FILENAME)
|
||||
|
||||
if not os.path.exists(config_path):
|
||||
atomic_write(config_path, CONFIG_HEADER)
|
||||
|
@ -611,17 +611,17 @@ def get_code_locations(config: ConfigDict) -> SimpleConfigValueDict:
|
|||
'REPO_DIR': {
|
||||
'path': os.path.abspath(config['REPO_DIR']),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['REPO_DIR'], 'archivebox')),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['REPO_DIR'], 'archivebox')),
|
||||
},
|
||||
'PYTHON_DIR': {
|
||||
'path': os.path.abspath(config['PYTHON_DIR']),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['PYTHON_DIR'], '__main__.py')),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['PYTHON_DIR'], '__main__.py')),
|
||||
},
|
||||
'TEMPLATES_DIR': {
|
||||
'path': os.path.abspath(config['TEMPLATES_DIR']),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['TEMPLATES_DIR'], 'static')),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['TEMPLATES_DIR'], 'static')),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -645,7 +645,7 @@ def get_data_locations(config: ConfigDict) -> ConfigValue:
|
|||
'OUTPUT_DIR': {
|
||||
'path': os.path.abspath(config['OUTPUT_DIR']),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['OUTPUT_DIR'], JSON_INDEX_FILENAME)),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['OUTPUT_DIR'], JSON_INDEX_FILENAME)),
|
||||
},
|
||||
'SOURCES_DIR': {
|
||||
'path': os.path.abspath(config['SOURCES_DIR']),
|
||||
|
@ -668,19 +668,19 @@ def get_data_locations(config: ConfigDict) -> ConfigValue:
|
|||
'is_valid': os.path.exists(config['CONFIG_FILE']),
|
||||
},
|
||||
'SQL_INDEX': {
|
||||
'path': os.path.abspath(os.path.join(config['OUTPUT_DIR'], SQL_INDEX_FILENAME)),
|
||||
'path': os.path.abspath(Path.joinpath(config['OUTPUT_DIR'], SQL_INDEX_FILENAME)),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['OUTPUT_DIR'], SQL_INDEX_FILENAME)),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['OUTPUT_DIR'], SQL_INDEX_FILENAME)),
|
||||
},
|
||||
'JSON_INDEX': {
|
||||
'path': os.path.abspath(os.path.join(config['OUTPUT_DIR'], JSON_INDEX_FILENAME)),
|
||||
'path': os.path.abspath(Path.joinpath(config['OUTPUT_DIR'], JSON_INDEX_FILENAME)),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['OUTPUT_DIR'], JSON_INDEX_FILENAME)),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['OUTPUT_DIR'], JSON_INDEX_FILENAME)),
|
||||
},
|
||||
'HTML_INDEX': {
|
||||
'path': os.path.abspath(os.path.join(config['OUTPUT_DIR'], HTML_INDEX_FILENAME)),
|
||||
'path': os.path.abspath(Path.joinpath(config['OUTPUT_DIR'], HTML_INDEX_FILENAME)),
|
||||
'enabled': True,
|
||||
'is_valid': os.path.exists(os.path.join(config['OUTPUT_DIR'], HTML_INDEX_FILENAME)),
|
||||
'is_valid': os.path.exists(Path.joinpath(config['OUTPUT_DIR'], HTML_INDEX_FILENAME)),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -877,9 +877,9 @@ def check_dependencies(config: ConfigDict=CONFIG, show_help: bool=True) -> None:
|
|||
|
||||
def check_data_folder(out_dir: Optional[str]=None, config: ConfigDict=CONFIG) -> None:
|
||||
output_dir = out_dir or config['OUTPUT_DIR']
|
||||
assert isinstance(output_dir, str)
|
||||
assert isinstance(output_dir, (str, Path))
|
||||
|
||||
sql_index_exists = os.path.exists(os.path.join(output_dir, SQL_INDEX_FILENAME))
|
||||
sql_index_exists = (Path(output_dir) / SQL_INDEX_FILENAME).exists()
|
||||
if not sql_index_exists:
|
||||
stderr('[X] No archivebox index found in the current directory.', color='red')
|
||||
stderr(f' {output_dir}', color='lightyellow')
|
||||
|
@ -909,7 +909,7 @@ def check_data_folder(out_dir: Optional[str]=None, config: ConfigDict=CONFIG) ->
|
|||
stderr(' archivebox init')
|
||||
raise SystemExit(3)
|
||||
|
||||
sources_dir = os.path.join(output_dir, SOURCES_DIR_NAME)
|
||||
sources_dir = Path.joinpath(output_dir, SOURCES_DIR_NAME)
|
||||
if not os.path.exists(sources_dir):
|
||||
os.makedirs(sources_dir)
|
||||
|
||||
|
@ -920,17 +920,17 @@ def setup_django(out_dir: str=None, check_db=False, config: ConfigDict=CONFIG) -
|
|||
|
||||
output_dir = out_dir or config['OUTPUT_DIR']
|
||||
|
||||
assert isinstance(output_dir, str) and isinstance(config['PYTHON_DIR'], str)
|
||||
assert isinstance(output_dir, (Path, str)) and isinstance(config['PYTHON_DIR'], Path)
|
||||
|
||||
try:
|
||||
import django
|
||||
sys.path.append(config['PYTHON_DIR'])
|
||||
os.environ.setdefault('OUTPUT_DIR', output_dir)
|
||||
sys.path.append(str(config['PYTHON_DIR']))
|
||||
os.environ.setdefault('OUTPUT_DIR', str(output_dir))
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
|
||||
django.setup()
|
||||
|
||||
if check_db:
|
||||
sql_index_path = os.path.join(output_dir, SQL_INDEX_FILENAME)
|
||||
sql_index_path = Path.joinpath(output_dir, SQL_INDEX_FILENAME)
|
||||
assert os.path.exists(sql_index_path), (
|
||||
f'No database file {SQL_INDEX_FILENAME} found in OUTPUT_DIR: {config["OUTPUT_DIR"]}')
|
||||
except KeyboardInterrupt:
|
||||
|
|
|
@ -6,6 +6,7 @@ import json as pyjson
|
|||
from pathlib import Path
|
||||
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Dict, Optional, Iterable
|
||||
from collections import OrderedDict
|
||||
from contextlib import contextmanager
|
||||
|
@ -224,7 +225,7 @@ def timed_index_update(out_path: str):
|
|||
|
||||
|
||||
@enforce_types
|
||||
def write_main_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished: bool=False) -> None:
|
||||
def write_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR, finished: bool=False) -> None:
|
||||
"""create index.html file for a given list of links"""
|
||||
|
||||
log_indexing_process_started(len(links))
|
||||
|
@ -260,7 +261,7 @@ def get_empty_snapshot_queryset(out_dir: str=OUTPUT_DIR):
|
|||
return Snapshot.objects.none()
|
||||
|
||||
@enforce_types
|
||||
def load_main_index(out_dir: str=OUTPUT_DIR, warn: bool=True) -> List[Link]:
|
||||
def load_main_index(out_dir: Path=OUTPUT_DIR, warn: bool=True) -> List[Link]:
|
||||
"""parse and load existing index with any new links from import_path merged in"""
|
||||
setup_django(out_dir, check_db=True)
|
||||
from core.models import Snapshot
|
||||
|
@ -271,7 +272,7 @@ def load_main_index(out_dir: str=OUTPUT_DIR, warn: bool=True) -> List[Link]:
|
|||
raise SystemExit(0)
|
||||
|
||||
@enforce_types
|
||||
def load_main_index_meta(out_dir: str=OUTPUT_DIR) -> Optional[dict]:
|
||||
def load_main_index_meta(out_dir: Path=OUTPUT_DIR) -> Optional[dict]:
|
||||
index_path = os.path.join(out_dir, JSON_INDEX_FILENAME)
|
||||
if os.path.exists(index_path):
|
||||
with open(index_path, 'r', encoding='utf-8') as f:
|
||||
|
@ -392,7 +393,7 @@ def snapshot_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type
|
|||
return snapshots.filter(q_filter)
|
||||
|
||||
|
||||
def get_indexed_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_indexed_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""indexed links without checking archive status or data directory validity"""
|
||||
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
|
||||
return {
|
||||
|
@ -400,7 +401,7 @@ def get_indexed_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optiona
|
|||
for link in links
|
||||
}
|
||||
|
||||
def get_archived_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_archived_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""indexed links that are archived with a valid data directory"""
|
||||
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
|
||||
return {
|
||||
|
@ -408,7 +409,7 @@ def get_archived_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Option
|
|||
for link in filter(is_archived, links)
|
||||
}
|
||||
|
||||
def get_unarchived_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_unarchived_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""indexed links that are unarchived with no data directory or an empty data directory"""
|
||||
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
|
||||
return {
|
||||
|
@ -416,7 +417,7 @@ def get_unarchived_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Opti
|
|||
for link in filter(is_unarchived, links)
|
||||
}
|
||||
|
||||
def get_present_folders(_snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_present_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that actually exist in the archive/ folder"""
|
||||
|
||||
all_folders = {}
|
||||
|
@ -433,7 +434,7 @@ def get_present_folders(_snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Option
|
|||
|
||||
return all_folders
|
||||
|
||||
def get_valid_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_valid_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs with a valid index matched to the main index and archived content"""
|
||||
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
|
||||
return {
|
||||
|
@ -441,7 +442,7 @@ def get_valid_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[
|
|||
for link in filter(is_valid, links)
|
||||
}
|
||||
|
||||
def get_invalid_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_invalid_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized"""
|
||||
duplicate = get_duplicate_folders(snapshots, out_dir=OUTPUT_DIR)
|
||||
orphaned = get_orphaned_folders(snapshots, out_dir=OUTPUT_DIR)
|
||||
|
@ -450,7 +451,7 @@ def get_invalid_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optiona
|
|||
return {**duplicate, **orphaned, **corrupted, **unrecognized}
|
||||
|
||||
|
||||
def get_duplicate_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_duplicate_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that conflict with other directories that have the same link URL or timestamp"""
|
||||
by_url = {}
|
||||
by_timestamp = {}
|
||||
|
@ -484,7 +485,7 @@ def get_duplicate_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optio
|
|||
duplicate_folders[path] = link
|
||||
return duplicate_folders
|
||||
|
||||
def get_orphaned_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_orphaned_folders(links, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that contain a valid index but aren't listed in the main index"""
|
||||
orphaned_folders = {}
|
||||
|
||||
|
@ -502,7 +503,7 @@ def get_orphaned_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Option
|
|||
|
||||
return orphaned_folders
|
||||
|
||||
def get_corrupted_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_corrupted_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that don't contain a valid index and aren't listed in the main index"""
|
||||
corrupted = {}
|
||||
for snapshot in snapshots.iterator():
|
||||
|
@ -511,7 +512,7 @@ def get_corrupted_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optio
|
|||
corrupted[link.link_dir] = link
|
||||
return corrupted
|
||||
|
||||
def get_unrecognized_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
def get_unrecognized_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
"""dirs that don't contain recognizable archive data and aren't listed in the main index"""
|
||||
unrecognized_folders: Dict[str, Optional[Link]] = {}
|
||||
|
||||
|
@ -580,7 +581,7 @@ def is_unarchived(link: Link) -> bool:
|
|||
return not link.is_archived
|
||||
|
||||
|
||||
def fix_invalid_folder_locations(out_dir: str=OUTPUT_DIR) -> Tuple[List[str], List[str]]:
|
||||
def fix_invalid_folder_locations(out_dir: Path=OUTPUT_DIR) -> Tuple[List[str], List[str]]:
|
||||
fixed = []
|
||||
cant_fix = []
|
||||
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME)):
|
||||
|
|
|
@ -5,6 +5,7 @@ import os
|
|||
from string import Template
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Iterator, Mapping
|
||||
from pathlib import Path
|
||||
|
||||
from .schema import Link
|
||||
from ..system import atomic_write, copy_and_overwrite
|
||||
|
@ -40,7 +41,7 @@ TITLE_LOADING_MSG = 'Not yet archived...'
|
|||
### Main Links Index
|
||||
|
||||
@enforce_types
|
||||
def parse_html_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[str]:
|
||||
def parse_html_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[str]:
|
||||
"""parse an archive index html file and return the list of urls"""
|
||||
|
||||
index_path = join(out_dir, HTML_INDEX_FILENAME)
|
||||
|
@ -52,7 +53,7 @@ def parse_html_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[str]:
|
|||
return ()
|
||||
|
||||
@enforce_types
|
||||
def write_html_main_index(links: List[Link], out_dir: str=OUTPUT_DIR, finished: bool=False) -> None:
|
||||
def write_html_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR, finished: bool=False) -> None:
|
||||
"""write the html link index to a given path"""
|
||||
|
||||
copy_and_overwrite(join(TEMPLATES_DIR, FAVICON_FILENAME), join(out_dir, FAVICON_FILENAME))
|
||||
|
|
|
@ -6,7 +6,7 @@ import json as pyjson
|
|||
from pathlib import Path
|
||||
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Iterator, Any
|
||||
from typing import List, Optional, Iterator, Any, Union
|
||||
|
||||
from .schema import Link, ArchiveResult
|
||||
from ..system import atomic_write
|
||||
|
@ -42,7 +42,7 @@ MAIN_INDEX_HEADER = {
|
|||
### Main Links Index
|
||||
|
||||
@enforce_types
|
||||
def parse_json_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
|
||||
def parse_json_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
|
||||
"""parse an archive index json file and return the list of links"""
|
||||
|
||||
index_path = os.path.join(out_dir, JSON_INDEX_FILENAME)
|
||||
|
@ -66,7 +66,7 @@ def parse_json_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
|
|||
return ()
|
||||
|
||||
@enforce_types
|
||||
def write_json_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
|
||||
def write_json_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""write the json link index to a given path"""
|
||||
|
||||
assert isinstance(links, List), 'Links must be a list, not a generator.'
|
||||
|
@ -101,7 +101,7 @@ def write_json_link_details(link: Link, out_dir: Optional[str]=None) -> None:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def parse_json_link_details(out_dir: str, guess: Optional[bool]=False) -> Optional[Link]:
|
||||
def parse_json_link_details(out_dir: Union[Path, str], guess: Optional[bool]=False) -> Optional[Link]:
|
||||
"""load the json link index from a given directory"""
|
||||
existing_index = os.path.join(out_dir, JSON_INDEX_FILENAME)
|
||||
if os.path.exists(existing_index):
|
||||
|
@ -115,7 +115,7 @@ def parse_json_link_details(out_dir: str, guess: Optional[bool]=False) -> Option
|
|||
|
||||
|
||||
@enforce_types
|
||||
def parse_json_links_details(out_dir: str) -> Iterator[Link]:
|
||||
def parse_json_links_details(out_dir: Union[Path, str]) -> Iterator[Link]:
|
||||
"""read through all the archive data folders and return the parsed links"""
|
||||
|
||||
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME)):
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
__package__ = 'archivebox.index'
|
||||
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Iterator
|
||||
from django.db.models import QuerySet
|
||||
|
||||
|
@ -12,7 +13,7 @@ from ..config import setup_django, OUTPUT_DIR
|
|||
### Main Links Index
|
||||
|
||||
@enforce_types
|
||||
def parse_sql_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
|
||||
def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
|
||||
setup_django(out_dir, check_db=True)
|
||||
from core.models import Snapshot
|
||||
|
||||
|
@ -22,7 +23,7 @@ def parse_sql_main_index(out_dir: str=OUTPUT_DIR) -> Iterator[Link]:
|
|||
)
|
||||
|
||||
@enforce_types
|
||||
def remove_from_sql_main_index(snapshots: QuerySet, out_dir: str=OUTPUT_DIR) -> None:
|
||||
def remove_from_sql_main_index(snapshots: QuerySet, out_dir: Path=OUTPUT_DIR) -> None:
|
||||
setup_django(out_dir, check_db=True)
|
||||
from django.db import transaction
|
||||
|
||||
|
@ -43,7 +44,7 @@ def write_link_to_sql_index(link: Link):
|
|||
|
||||
|
||||
@enforce_types
|
||||
def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
|
||||
def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None:
|
||||
setup_django(out_dir, check_db=True)
|
||||
from django.db import transaction
|
||||
|
||||
|
@ -53,7 +54,7 @@ def write_sql_main_index(links: List[Link], out_dir: str=OUTPUT_DIR) -> None:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def write_sql_link_details(link: Link, out_dir: str=OUTPUT_DIR) -> None:
|
||||
def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR) -> None:
|
||||
setup_django(out_dir, check_db=True)
|
||||
from core.models import Snapshot
|
||||
from django.db import transaction
|
||||
|
@ -70,7 +71,7 @@ def write_sql_link_details(link: Link, out_dir: str=OUTPUT_DIR) -> None:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
|
||||
def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
|
||||
setup_django(out_dir, check_db=False)
|
||||
from django.core.management import call_command
|
||||
out = StringIO()
|
||||
|
@ -87,7 +88,7 @@ def list_migrations(out_dir: str=OUTPUT_DIR) -> List[Tuple[bool, str]]:
|
|||
return migrations
|
||||
|
||||
@enforce_types
|
||||
def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
|
||||
def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
|
||||
setup_django(out_dir, check_db=False)
|
||||
from django.core.management import call_command
|
||||
null, out = StringIO(), StringIO()
|
||||
|
@ -98,7 +99,7 @@ def apply_migrations(out_dir: str=OUTPUT_DIR) -> List[str]:
|
|||
return [line.strip() for line in out.readlines() if line.strip()]
|
||||
|
||||
@enforce_types
|
||||
def get_admins(out_dir: str=OUTPUT_DIR) -> List[str]:
|
||||
def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
|
||||
setup_django(out_dir, check_db=False)
|
||||
from django.contrib.auth.models import User
|
||||
return User.objects.filter(is_superuser=True)
|
||||
|
|
|
@ -5,6 +5,7 @@ import sys
|
|||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Iterable, IO, Union
|
||||
from crontab import CronTab, CronSlices
|
||||
from django.db.models import QuerySet
|
||||
|
@ -130,7 +131,7 @@ ALLOWED_IN_OUTPUT_DIR = {
|
|||
}
|
||||
|
||||
@enforce_types
|
||||
def help(out_dir: str=OUTPUT_DIR) -> None:
|
||||
def help(out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Print the ArchiveBox help message and usage"""
|
||||
|
||||
all_subcommands = list_subcommands()
|
||||
|
@ -153,7 +154,7 @@ def help(out_dir: str=OUTPUT_DIR) -> None:
|
|||
)
|
||||
|
||||
|
||||
if os.path.exists(os.path.join(out_dir, SQL_INDEX_FILENAME)):
|
||||
if (Path(out_dir) / SQL_INDEX_FILENAME).exists():
|
||||
print('''{green}ArchiveBox v{}: The self-hosted internet archive.{reset}
|
||||
|
||||
{lightred}Active data directory:{reset}
|
||||
|
@ -202,7 +203,7 @@ def help(out_dir: str=OUTPUT_DIR) -> None:
|
|||
|
||||
@enforce_types
|
||||
def version(quiet: bool=False,
|
||||
out_dir: str=OUTPUT_DIR) -> None:
|
||||
out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Print the ArchiveBox version and dependency information"""
|
||||
|
||||
if quiet:
|
||||
|
@ -239,7 +240,7 @@ def version(quiet: bool=False,
|
|||
def run(subcommand: str,
|
||||
subcommand_args: Optional[List[str]],
|
||||
stdin: Optional[IO]=None,
|
||||
out_dir: str=OUTPUT_DIR) -> None:
|
||||
out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Run a given ArchiveBox subcommand with the given list of args"""
|
||||
run_subcommand(
|
||||
subcommand=subcommand,
|
||||
|
@ -250,9 +251,9 @@ def run(subcommand: str,
|
|||
|
||||
|
||||
@enforce_types
|
||||
def init(force: bool=False, out_dir: str=OUTPUT_DIR) -> None:
|
||||
def init(force: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Initialize a new ArchiveBox collection in the current directory"""
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
Path(out_dir).mkdir(exist_ok=True)
|
||||
is_empty = not len(set(os.listdir(out_dir)) - ALLOWED_IN_OUTPUT_DIR)
|
||||
|
||||
if (Path(out_dir) / JSON_INDEX_FILENAME).exists():
|
||||
|
@ -289,32 +290,31 @@ def init(force: bool=False, out_dir: str=OUTPUT_DIR) -> None:
|
|||
else:
|
||||
print('\n{green}[+] Building archive folder structure...{reset}'.format(**ANSI))
|
||||
|
||||
os.makedirs(SOURCES_DIR, exist_ok=True)
|
||||
Path(SOURCES_DIR).mkdir(exist_ok=True)
|
||||
print(f' √ {SOURCES_DIR}')
|
||||
|
||||
os.makedirs(ARCHIVE_DIR, exist_ok=True)
|
||||
Path(ARCHIVE_DIR).mkdir(exist_ok=True)
|
||||
print(f' √ {ARCHIVE_DIR}')
|
||||
|
||||
os.makedirs(LOGS_DIR, exist_ok=True)
|
||||
Path(LOGS_DIR).mkdir(exist_ok=True)
|
||||
print(f' √ {LOGS_DIR}')
|
||||
|
||||
write_config_file({}, out_dir=out_dir)
|
||||
print(f' √ {CONFIG_FILE}')
|
||||
|
||||
if os.path.exists(os.path.join(out_dir, SQL_INDEX_FILENAME)):
|
||||
if (Path(out_dir) / SQL_INDEX_FILENAME).exists():
|
||||
print('\n{green}[*] Verifying main SQL index and running migrations...{reset}'.format(**ANSI))
|
||||
else:
|
||||
print('\n{green}[+] Building main SQL index and running migrations...{reset}'.format(**ANSI))
|
||||
|
||||
setup_django(out_dir, check_db=False)
|
||||
DATABASE_FILE = os.path.join(out_dir, SQL_INDEX_FILENAME)
|
||||
DATABASE_FILE = Path(out_dir) / SQL_INDEX_FILENAME
|
||||
print(f' √ {DATABASE_FILE}')
|
||||
print()
|
||||
for migration_line in apply_migrations(out_dir):
|
||||
print(f' {migration_line}')
|
||||
|
||||
|
||||
assert os.path.exists(DATABASE_FILE)
|
||||
assert DATABASE_FILE.exists()
|
||||
|
||||
# from django.contrib.auth.models import User
|
||||
# if IS_TTY and not User.objects.filter(is_superuser=True).exists():
|
||||
|
@ -391,7 +391,7 @@ def init(force: bool=False, out_dir: str=OUTPUT_DIR) -> None:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def status(out_dir: str=OUTPUT_DIR) -> None:
|
||||
def status(out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Print out some info and statistics about the archive collection"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
@ -491,7 +491,7 @@ def status(out_dir: str=OUTPUT_DIR) -> None:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def oneshot(url: str, out_dir: str=OUTPUT_DIR):
|
||||
def oneshot(url: str, out_dir: Path=OUTPUT_DIR):
|
||||
"""
|
||||
Create a single URL archive folder with an index.json and index.html, and all the archive method outputs.
|
||||
You can run this to archive single pages without needing to create a whole collection with archivebox init.
|
||||
|
@ -514,7 +514,7 @@ def add(urls: Union[str, List[str]],
|
|||
index_only: bool=False,
|
||||
overwrite: bool=False,
|
||||
init: bool=False,
|
||||
out_dir: str=OUTPUT_DIR) -> List[Link]:
|
||||
out_dir: Path=OUTPUT_DIR) -> List[Link]:
|
||||
"""Add a new URL or list of URLs to your archive"""
|
||||
|
||||
assert depth in (0, 1), 'Depth must be 0 or 1 (depth >1 is not supported yet)'
|
||||
|
@ -577,7 +577,7 @@ def remove(filter_str: Optional[str]=None,
|
|||
before: Optional[float]=None,
|
||||
yes: bool=False,
|
||||
delete: bool=False,
|
||||
out_dir: str=OUTPUT_DIR) -> List[Link]:
|
||||
out_dir: Path=OUTPUT_DIR) -> List[Link]:
|
||||
"""Remove the specified URLs from the archive"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
@ -658,7 +658,7 @@ def update(resume: Optional[float]=None,
|
|||
status: Optional[str]=None,
|
||||
after: Optional[str]=None,
|
||||
before: Optional[str]=None,
|
||||
out_dir: str=OUTPUT_DIR) -> List[Link]:
|
||||
out_dir: Path=OUTPUT_DIR) -> List[Link]:
|
||||
"""Import any new links from subscriptions and retry any previously failed/skipped links"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
@ -714,7 +714,7 @@ def list_all(filter_patterns_str: Optional[str]=None,
|
|||
json: bool=False,
|
||||
html: bool=False,
|
||||
with_headers: bool=False,
|
||||
out_dir: str=OUTPUT_DIR) -> Iterable[Link]:
|
||||
out_dir: Path=OUTPUT_DIR) -> Iterable[Link]:
|
||||
"""List, filter, and export information about archive entries"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
@ -756,7 +756,7 @@ def list_links(snapshots: Optional[QuerySet]=None,
|
|||
filter_type: str='exact',
|
||||
after: Optional[float]=None,
|
||||
before: Optional[float]=None,
|
||||
out_dir: str=OUTPUT_DIR) -> Iterable[Link]:
|
||||
out_dir: Path=OUTPUT_DIR) -> Iterable[Link]:
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
||||
|
@ -776,7 +776,7 @@ def list_links(snapshots: Optional[QuerySet]=None,
|
|||
@enforce_types
|
||||
def list_folders(links: List[Link],
|
||||
status: str,
|
||||
out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
||||
|
@ -805,7 +805,7 @@ def config(config_options_str: Optional[str]=None,
|
|||
get: bool=False,
|
||||
set: bool=False,
|
||||
reset: bool=False,
|
||||
out_dir: str=OUTPUT_DIR) -> None:
|
||||
out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Get and set your ArchiveBox project configuration values"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
@ -905,12 +905,12 @@ def schedule(add: bool=False,
|
|||
every: Optional[str]=None,
|
||||
depth: int=0,
|
||||
import_path: Optional[str]=None,
|
||||
out_dir: str=OUTPUT_DIR):
|
||||
out_dir: Path=OUTPUT_DIR):
|
||||
"""Set ArchiveBox to regularly import URLs at specific times using cron"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
||||
os.makedirs(os.path.join(out_dir, LOGS_DIR_NAME), exist_ok=True)
|
||||
(Path(out_dir) / LOGS_DIR_NAME).mkdir(exist_ok=True)
|
||||
|
||||
cron = CronTab(user=True)
|
||||
cron = dedupe_cron_jobs(cron)
|
||||
|
@ -932,7 +932,7 @@ def schedule(add: bool=False,
|
|||
quoted(ARCHIVEBOX_BINARY),
|
||||
*(['add', f'--depth={depth}', f'"{import_path}"'] if import_path else ['update']),
|
||||
'>',
|
||||
quoted(os.path.join(LOGS_DIR, 'archivebox.log')),
|
||||
quoted(Path(LOGS_DIR) / 'archivebox.log'),
|
||||
'2>&1',
|
||||
|
||||
]
|
||||
|
@ -1016,7 +1016,7 @@ def server(runserver_args: Optional[List[str]]=None,
|
|||
reload: bool=False,
|
||||
debug: bool=False,
|
||||
init: bool=False,
|
||||
out_dir: str=OUTPUT_DIR) -> None:
|
||||
out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Run the ArchiveBox HTTP server"""
|
||||
|
||||
runserver_args = runserver_args or []
|
||||
|
@ -1063,7 +1063,7 @@ def server(runserver_args: Optional[List[str]]=None,
|
|||
|
||||
|
||||
@enforce_types
|
||||
def manage(args: Optional[List[str]]=None, out_dir: str=OUTPUT_DIR) -> None:
|
||||
def manage(args: Optional[List[str]]=None, out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Run an ArchiveBox Django management command"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
@ -1079,7 +1079,7 @@ def manage(args: Optional[List[str]]=None, out_dir: str=OUTPUT_DIR) -> None:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def shell(out_dir: str=OUTPUT_DIR) -> None:
|
||||
def shell(out_dir: Path=OUTPUT_DIR) -> None:
|
||||
"""Enter an interactive ArchiveBox Django shell"""
|
||||
|
||||
check_data_folder(out_dir=out_dir)
|
||||
|
|
|
@ -13,6 +13,7 @@ from io import StringIO
|
|||
|
||||
from typing import IO, Tuple, List, Optional
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from ..system import atomic_write
|
||||
from ..config import (
|
||||
|
@ -125,7 +126,7 @@ def run_parser_functions(to_parse: IO[str], timer, root_url: Optional[str]=None)
|
|||
|
||||
|
||||
@enforce_types
|
||||
def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir: str=OUTPUT_DIR) -> str:
|
||||
def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir: Path=OUTPUT_DIR) -> str:
|
||||
ts = str(datetime.now().timestamp()).split('.', 1)[0]
|
||||
source_path = os.path.join(out_dir, SOURCES_DIR_NAME, filename.format(ts=ts))
|
||||
atomic_write(source_path, raw_text)
|
||||
|
@ -134,7 +135,7 @@ def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir:
|
|||
|
||||
|
||||
@enforce_types
|
||||
def save_file_as_source(path: str, timeout: int=TIMEOUT, filename: str='{ts}-{basename}.txt', out_dir: str=OUTPUT_DIR) -> str:
|
||||
def save_file_as_source(path: str, timeout: int=TIMEOUT, filename: str='{ts}-{basename}.txt', out_dir: Path=OUTPUT_DIR) -> str:
|
||||
"""download a given url's content into output/sources/domain-<timestamp>.txt"""
|
||||
ts = str(datetime.now().timestamp()).split('.', 1)[0]
|
||||
source_path = os.path.join(OUTPUT_DIR, SOURCES_DIR_NAME, filename.format(basename=basename(path), ts=ts))
|
||||
|
|
Loading…
Reference in a new issue