switch .is_dir and .exists for os.access to avoid PermissionError on startup

This commit is contained in:
Nick Sweeting 2024-10-08 03:02:34 -07:00
parent c3dd0f22e5
commit de2ab43f7f
No known key found for this signature in database
22 changed files with 119 additions and 97 deletions

View file

@ -1,5 +1,6 @@
__package__ = 'abx.archivebox'
import os
import importlib
from typing import Dict
@ -21,7 +22,7 @@ def load_archivebox_plugins(pm, plugins_dict: Dict[str, Path]):
archivebox_plugins_found.append(plugin_module_loaded.PLUGIN)
# 2. then try to import plugin_module.apps as well
if (plugin_dir / 'apps.py').exists():
if os.access(plugin_dir / 'apps.py', os.R_OK):
plugin_apps = importlib.import_module(plugin_module + '.apps')
pm.register(plugin_apps) # register the whole .apps in case it contains loose hookimpls (not in a class)
if hasattr(plugin_apps, 'PLUGIN'):

View file

@ -1,5 +1,6 @@
__package__ = "abx.archivebox"
import os
from typing import Dict, List
from typing_extensions import Self
@ -57,7 +58,7 @@ class BaseBinary(BaseHook, Binary):
def symlink_to_lib(binary, bin_dir=None) -> None:
bin_dir = bin_dir or CONSTANTS.LIB_BIN_DIR
if not (binary.abspath and binary.abspath.exists()):
if not (binary.abspath and os.access(binary.abspath, os.R_OK)):
return
try:

View file

@ -200,22 +200,22 @@ class ConstantsDict(Mapping):
'PACKAGE_DIR': {
'path': (PACKAGE_DIR).resolve(),
'enabled': True,
'is_valid': (PACKAGE_DIR / '__main__.py').exists(), # read + list
'is_valid': os.access(PACKAGE_DIR / '__main__.py', os.X_OK), # executable
},
'TEMPLATES_DIR': {
'path': TEMPLATES_DIR.resolve(),
'enabled': True,
'is_valid': STATIC_DIR.exists() and os.access(STATIC_DIR, os.R_OK) and os.access(STATIC_DIR, os.X_OK), # read + list
'is_valid': os.access(STATIC_DIR, os.R_OK) and os.access(STATIC_DIR, os.X_OK), # read + list
},
'LIB_DIR': {
'path': LIB_DIR.resolve(),
'enabled': True,
'is_valid': LIB_DIR.is_dir() and os.access(LIB_DIR, os.R_OK) and os.access(LIB_DIR, os.X_OK) and os.access(LIB_DIR, os.W_OK), # read + write
'is_valid': os.access(LIB_DIR, os.R_OK) and os.access(LIB_DIR, os.X_OK) and os.access(LIB_DIR, os.W_OK), # read + write
},
'TMP_DIR': {
'path': TMP_DIR.resolve(),
'enabled': True,
'is_valid': TMP_DIR.is_dir() and os.access(TMP_DIR, os.R_OK) and os.access(TMP_DIR, os.X_OK) and os.access(TMP_DIR, os.W_OK), # read + write
'is_valid': os.access(TMP_DIR, os.R_OK) and os.access(TMP_DIR, os.X_OK) and os.access(TMP_DIR, os.W_OK), # read + write
},
})
@ -223,61 +223,61 @@ class ConstantsDict(Mapping):
"DATA_DIR": {
"path": DATA_DIR.resolve(),
"enabled": True,
"is_valid": DATABASE_FILE.exists() and os.access(DATA_DIR, os.R_OK) and os.access(DATA_DIR, os.W_OK) and os.access(DATA_DIR, os.X_OK),
"is_valid": os.access(DATA_DIR, os.R_OK) and os.access(DATA_DIR, os.W_OK) and os.access(DATA_DIR, os.X_OK),
"is_mount": os.path.ismount(DATA_DIR.resolve()),
},
"CONFIG_FILE": {
"path": CONFIG_FILE.resolve(),
"enabled": True,
"is_valid": CONFIG_FILE.exists() and os.access(CONFIG_FILE, os.W_OK),
"is_valid": os.access(CONFIG_FILE, os.R_OK) and os.access(CONFIG_FILE, os.W_OK),
},
"SQL_INDEX": {
"path": DATABASE_FILE.resolve(),
"enabled": True,
"is_valid": DATABASE_FILE.exists() and os.access(DATABASE_FILE, os.R_OK) and os.access(DATABASE_FILE, os.W_OK),
"is_valid": os.access(DATABASE_FILE, os.R_OK) and os.access(DATABASE_FILE, os.W_OK),
"is_mount": os.path.ismount(DATABASE_FILE.resolve()),
},
"QUEUE_DATABASE": {
"path": QUEUE_DATABASE_FILE.resolve(),
"enabled": True,
"is_valid": QUEUE_DATABASE_FILE.exists() and os.access(QUEUE_DATABASE_FILE, os.R_OK) and os.access(QUEUE_DATABASE_FILE, os.W_OK),
"is_valid": os.access(QUEUE_DATABASE_FILE, os.R_OK) and os.access(QUEUE_DATABASE_FILE, os.W_OK),
"is_mount": os.path.ismount(QUEUE_DATABASE_FILE.resolve()),
},
"ARCHIVE_DIR": {
"path": ARCHIVE_DIR.resolve(),
"enabled": True,
"is_valid": ARCHIVE_DIR.exists() and os.access(ARCHIVE_DIR, os.R_OK) and os.access(ARCHIVE_DIR, os.W_OK) and os.access(ARCHIVE_DIR, os.X_OK),
"is_valid": os.access(ARCHIVE_DIR, os.R_OK) and os.access(ARCHIVE_DIR, os.W_OK) and os.access(ARCHIVE_DIR, os.X_OK),
"is_mount": os.path.ismount(ARCHIVE_DIR.resolve()),
},
"SOURCES_DIR": {
"path": SOURCES_DIR.resolve(),
"enabled": True,
"is_valid": SOURCES_DIR.exists() and os.access(SOURCES_DIR, os.R_OK) and os.access(SOURCES_DIR, os.W_OK) and os.access(SOURCES_DIR, os.X_OK),
"is_valid": os.access(SOURCES_DIR, os.R_OK) and os.access(SOURCES_DIR, os.W_OK) and os.access(SOURCES_DIR, os.X_OK),
},
"LOGS_DIR": {
"path": LOGS_DIR.resolve(),
"enabled": True,
"is_valid": LOGS_DIR.is_dir() and os.access(LOGS_DIR, os.R_OK) and os.access(LOGS_DIR, os.W_OK) and os.access(LOGS_DIR, os.X_OK), # read + write
"is_valid": os.access(LOGS_DIR, os.R_OK) and os.access(LOGS_DIR, os.W_OK) and os.access(LOGS_DIR, os.X_OK), # read + write
},
# "CACHE_DIR": {
# "path": CACHE_DIR.resolve(),
# "enabled": True,
# "is_valid": CACHE_DIR.is_dir() and os.access(CACHE_DIR, os.R_OK) and os.access(CACHE_DIR, os.W_OK) and os.access(CACHE_DIR, os.X_OK), # read + write
# "is_valid": os.access(CACHE_DIR, os.R_OK) and os.access(CACHE_DIR, os.W_OK) and os.access(CACHE_DIR, os.X_OK), # read + write
# },
"PERSONAS_DIR": {
"path": PERSONAS_DIR.resolve(),
"enabled": PERSONAS_DIR.exists(),
"is_valid": PERSONAS_DIR.is_dir() and os.access(PERSONAS_DIR, os.R_OK) and os.access(PERSONAS_DIR, os.W_OK) and os.access(PERSONAS_DIR, os.X_OK), # read + write
"enabled": os.access(PERSONAS_DIR, os.R_OK),
"is_valid": os.access(PERSONAS_DIR, os.R_OK) and os.access(PERSONAS_DIR, os.W_OK) and os.access(PERSONAS_DIR, os.X_OK), # read + write
},
'CUSTOM_TEMPLATES_DIR': {
'path': CUSTOM_TEMPLATES_DIR.resolve(),
'enabled': CUSTOM_TEMPLATES_DIR.exists(),
'is_valid': CUSTOM_TEMPLATES_DIR.is_dir() and os.access(CUSTOM_TEMPLATES_DIR, os.R_OK) and os.access(CUSTOM_TEMPLATES_DIR, os.X_OK), # read
'enabled': os.access(CUSTOM_TEMPLATES_DIR, os.R_OK),
'is_valid': os.access(CUSTOM_TEMPLATES_DIR, os.R_OK) and os.access(CUSTOM_TEMPLATES_DIR, os.X_OK), # read
},
'USER_PLUGINS_DIR': {
'path': USER_PLUGINS_DIR.resolve(),
'enabled': USER_PLUGINS_DIR.exists(),
'is_valid': USER_PLUGINS_DIR.is_dir() and os.access(USER_PLUGINS_DIR, os.R_OK) and os.access(USER_PLUGINS_DIR, os.X_OK), # read
'enabled': os.access(USER_PLUGINS_DIR, os.R_OK),
'is_valid': os.access(USER_PLUGINS_DIR, os.R_OK) and os.access(USER_PLUGINS_DIR, os.X_OK), # read
},
})

View file

@ -270,7 +270,7 @@ def load_config_file(out_dir: str | None=CONSTANTS.DATA_DIR) -> Optional[benedic
"""load the ini-formatted config file from DATA_DIR/Archivebox.conf"""
config_path = CONSTANTS.CONFIG_FILE
if config_path.exists():
if os.access(config_path, os.R_OK):
config_file = ConfigParser()
config_file.optionxform = str
config_file.read(config_path)
@ -307,7 +307,7 @@ def write_config_file(config: Dict[str, str], out_dir: str | None=CONSTANTS.DATA
config_path = CONSTANTS.CONFIG_FILE
if not config_path.exists():
if not os.access(config_path, os.F_OK):
atomic_write(config_path, CONFIG_HEADER)
config_file = ConfigParser()
@ -355,7 +355,7 @@ def write_config_file(config: Dict[str, str], out_dir: str | None=CONSTANTS.DATA
raise
if Path(f'{config_path}.bak').exists():
if os.access(f'{config_path}.bak', os.F_OK):
os.remove(f'{config_path}.bak')
return benedict({
@ -462,7 +462,7 @@ def find_chrome_data_dir() -> Optional[str]:
# )
# for path in default_profile_paths:
# full_path = Path(path).resolve()
# if full_path.exists():
# if full_path.is_dir():
# return full_path
return None
@ -639,7 +639,7 @@ def setup_django(out_dir: Path | None=None, check_db=False, config: benedict=CON
conn.close_if_unusable_or_obsolete()
sql_index_path = CONSTANTS.DATABASE_FILE
assert sql_index_path.exists(), (
assert os.access(sql_index_path, os.F_OK), (
f'No database file {sql_index_path} found in: {CONSTANTS.DATA_DIR} (Are you in an ArchiveBox collection directory?)')
bump_startup_progress_bar()

View file

@ -475,7 +475,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
# ordering='archiveresult_count'
)
def size(self, obj):
archive_size = (Path(obj.link_dir) / 'index.html').exists() and obj.archive_size
archive_size = os.access(Path(obj.link_dir) / 'index.html', os.F_OK) and obj.archive_size
if archive_size:
size_txt = printable_filesize(archive_size)
if archive_size > 52428800:
@ -740,7 +740,7 @@ class ArchiveResultAdmin(ABIDModelAdmin):
output_str += format_html('<a href="/archive/{}/index.html#all">See result files ...</a><br/><pre><code>', str(result.snapshot.timestamp))
path_from_output_str = (snapshot_dir / result.output)
output_str += format_html('<i style="padding: 1px">{}</i><b style="padding-right: 20px">/</b><i>{}</i><br/><hr/>', str(snapshot_dir), str(result.output))
if path_from_output_str.exists():
if os.access(path_from_output_str, os.R_OK):
root_dir = str(path_from_output_str)
else:
root_dir = str(snapshot_dir)

View file

@ -4,6 +4,7 @@ __package__ = 'archivebox.core'
from typing import Optional, Dict, Iterable
from django_stubs_ext.db.models import TypedModelMeta
import os
import json
from pathlib import Path
@ -22,7 +23,7 @@ from archivebox.config import CONSTANTS
from abid_utils.models import ABIDModel, ABIDField, AutoDateTimeField
from queues.tasks import bg_archive_snapshot
from machine.models import Machine, NetworkInterface
# from machine.models import Machine, NetworkInterface
from archivebox.misc.system import get_dir_size
from archivebox.misc.util import parse_date, base_url
@ -604,7 +605,7 @@ class ArchiveResult(ABIDModel):
return link.canonical_outputs().get(f'{self.extractor}_path')
def output_exists(self) -> bool:
return Path(self.output_path()).exists()
return os.access(self.output_path(), os.R_OK)
# def get_storage_dir(self, create=True, symlink=True):

View file

@ -21,11 +21,11 @@ def serve_static_with_byterange_support(request, path, document_root=None, show_
assert document_root
path = posixpath.normpath(path).lstrip("/")
fullpath = Path(safe_join(document_root, path))
if fullpath.is_dir():
if os.access(fullpath, os.R_OK) and fullpath.is_dir():
if show_indexes:
return static.directory_index(path, fullpath)
raise Http404(_("Directory indexes are not allowed here."))
if not fullpath.exists():
if not os.access(fullpath, os.R_OK):
raise Http404(_("%(path)s” does not exist") % {"path": fullpath})
# Respect the If-Modified-Since header.

View file

@ -169,8 +169,9 @@ AUTHENTICATION_BACKENDS = [
STATIC_URL = '/static/'
TEMPLATES_DIR_NAME = 'templates'
CUSTOM_TEMPLATES_ENABLED = os.access(CONSTANTS.CUSTOM_TEMPLATES_DIR, os.R_OK) and CONSTANTS.CUSTOM_TEMPLATES_DIR.is_dir()
STATICFILES_DIRS = [
*([str(CONSTANTS.CUSTOM_TEMPLATES_DIR / 'static')] if CONSTANTS.CUSTOM_TEMPLATES_DIR.is_dir() else []),
*([str(CONSTANTS.CUSTOM_TEMPLATES_DIR / 'static')] if CUSTOM_TEMPLATES_ENABLED else []),
# *[
# str(plugin_dir / 'static')
# for plugin_dir in PLUGIN_DIRS.values()
@ -181,7 +182,7 @@ STATICFILES_DIRS = [
]
TEMPLATE_DIRS = [
*([str(CONSTANTS.CUSTOM_TEMPLATES_DIR)] if CONSTANTS.CUSTOM_TEMPLATES_DIR.is_dir() else []),
*([str(CONSTANTS.CUSTOM_TEMPLATES_DIR)] if CUSTOM_TEMPLATES_ENABLED else []),
# *[
# str(plugin_dir / 'templates')
# for plugin_dir in PLUGIN_DIRS.values()
@ -600,7 +601,7 @@ if DEBUG_REQUESTS_TRACKER:
# # https://docs.pydantic.dev/logfire/integrations/django/ (similar to DataDog / NewRelic / etc.)
# DEBUG_LOGFIRE = False
# DEBUG_LOGFIRE = DEBUG_LOGFIRE and (DATA_DIR / '.logfire').is_dir()
# DEBUG_LOGFIRE = DEBUG_LOGFIRE and os.access(DATA_DIR / '.logfire', os.W_OK) and (DATA_DIR / '.logfire').is_dir()
# For usage with https://www.jetadmin.io/integrations/django

View file

@ -1,5 +1,8 @@
__package__ = 'archivebox.core'
import re
import os
import shutil
import tempfile
import logging
@ -54,7 +57,7 @@ ERROR_LOG = tempfile.NamedTemporaryFile().name
LOGS_DIR = CONSTANTS.LOGS_DIR
if LOGS_DIR.is_dir():
if os.access(LOGS_DIR, os.W_OK) and LOGS_DIR.is_dir():
ERROR_LOG = (LOGS_DIR / 'errors.log')
else:
# historically too many edge cases here around creating log dir w/ correct permissions early on

View file

@ -1,5 +1,6 @@
__package__ = 'archivebox.core'
import os
import inspect
from typing import Callable, get_type_hints
from pathlib import Path
@ -67,6 +68,7 @@ class SnapshotView(View):
if (result.status == 'succeeded'
and (result.extractor not in HIDDEN_RESULTS)
and embed_path
and os.access(abs_path, os.R_OK)
and abs_path.exists()):
if abs_path.is_dir() and not any(abs_path.glob('*.*')):
continue
@ -102,6 +104,8 @@ class SnapshotView(View):
# iterate through all the files in the snapshot dir and add the biggest ones to1 the result list
snap_dir = Path(snapshot.link_dir)
assert os.access(snap_dir, os.R_OK) and os.access(snap_dir, os.X_OK)
for result_file in (*snap_dir.glob('*'), *snap_dir.glob('*/*')):
extension = result_file.suffix.lstrip('.').lower()
if result_file.is_dir() or result_file.name.startswith('.') or extension not in allowed_extensions:

View file

@ -1,6 +1,7 @@
__package__ = 'archivebox.extractors'
import re
import os
from pathlib import Path
from typing import Optional
@ -147,23 +148,22 @@ def unsafe_wget_output_path(link: Link) -> Optional[str]:
search_dir = Path(link.link_dir) / domain(link.url).replace(":", "+") / urldecode(full_path)
for _ in range(4):
try:
if search_dir.exists():
if search_dir.is_dir():
html_files = [
f for f in search_dir.iterdir()
if re.search(".+\\.[Ss]?[Hh][Tt][Mm][Ll]?$", str(f), re.I | re.M)
]
if html_files:
return str(html_files[0].relative_to(link.link_dir))
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
html_files = [
f for f in search_dir.iterdir()
if re.search(".+\\.[Ss]?[Hh][Tt][Mm][Ll]?$", str(f), re.I | re.M)
]
if html_files:
return str(html_files[0].relative_to(link.link_dir))
# sometimes wget'd URLs have no ext and return non-html
# e.g. /some/example/rss/all -> some RSS XML content)
# /some/other/url.o4g -> some binary unrecognized ext)
# test this with archivebox add --depth=1 https://getpocket.com/users/nikisweeting/feed/all
last_part_of_url = urldecode(full_path.rsplit('/', 1)[-1])
for file_present in search_dir.iterdir():
if file_present == last_part_of_url:
return str((search_dir / file_present).relative_to(link.link_dir))
# sometimes wget'd URLs have no ext and return non-html
# e.g. /some/example/rss/all -> some RSS XML content)
# /some/other/url.o4g -> some binary unrecognized ext)
# test this with archivebox add --depth=1 https://getpocket.com/users/nikisweeting/feed/all
last_part_of_url = urldecode(full_path.rsplit('/', 1)[-1])
for file_present in search_dir.iterdir():
if file_present == last_part_of_url:
return str((search_dir / file_present).relative_to(link.link_dir))
except OSError:
# OSError 36 and others can happen here, caused by trying to check for impossible paths
# (paths derived from URLs can often contain illegal unicode characters or be too long,
@ -278,12 +278,12 @@ def wget_output_path(link: Link, nocache: bool=False) -> Optional[str]:
# fallback to just the domain dir
search_dir = Path(link.link_dir) / domain(link.url).replace(":", "+")
if search_dir.is_dir():
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
return domain(link.url).replace(":", "+")
# fallback to just the domain dir without port
search_dir = Path(link.link_dir) / domain(link.url).split(":", 1)[0]
if search_dir.is_dir():
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
return domain(link.url).split(":", 1)[0]
return None

View file

@ -249,7 +249,7 @@ def load_main_index(out_dir: Path | str=DATA_DIR, warn: bool=True) -> List[Link]
@enforce_types
def load_main_index_meta(out_dir: Path=DATA_DIR) -> Optional[dict]:
index_path = out_dir / CONSTANTS.JSON_INDEX_FILENAME
if index_path.exists():
if os.access(index_path, os.F_OK):
with open(index_path, 'r', encoding='utf-8') as f:
meta_dict = pyjson.load(f)
meta_dict.pop('links')

View file

@ -102,7 +102,7 @@ def parse_json_link_details(out_dir: Union[Path, str], guess: bool=False) -> Opt
"""load the json link index from a given directory"""
existing_index = Path(out_dir) / CONSTANTS.JSON_INDEX_FILENAME
if existing_index.exists():
if os.access(existing_index, os.F_OK):
with open(existing_index, 'r', encoding='utf-8') as f:
try:
link_json = pyjson.load(f)
@ -119,7 +119,7 @@ def parse_json_links_details(out_dir: Union[Path, str]) -> Iterator[Link]:
for entry in os.scandir(CONSTANTS.ARCHIVE_DIR):
if entry.is_dir(follow_symlinks=True):
if (Path(entry.path) / 'index.json').exists():
if os.access((Path(entry.path) / 'index.json'), os.F_OK):
try:
link = parse_json_link_details(entry.path)
except KeyError:

View file

@ -478,7 +478,7 @@ def log_list_finished(links):
def log_removal_started(links: List["Link"], yes: bool, delete: bool):
print(f'[yellow3][i] Found {len(links)} matching URLs to remove.[/]')
if delete:
file_counts = [link.num_outputs for link in links if Path(link.link_dir).exists()]
file_counts = [link.num_outputs for link in links if os.access(link.link_dir, os.R_OK)]
print(
f' {len(links)} Links will be de-listed from the main index, and their archived content folders will be deleted from disk.\n'
f' ({len(file_counts)} data folders with {sum(file_counts)} archived files will be deleted!)'
@ -572,7 +572,7 @@ def printable_folder_status(name: str, folder: Dict) -> str:
if folder['path']:
if Path(folder['path']).exists():
if os.access(folder['path'], os.R_OK):
num_files = (
f'{len(os.listdir(folder["path"]))} files'
if Path(folder['path']).is_dir() else

View file

@ -140,7 +140,7 @@ def help(out_dir: Path=DATA_DIR) -> None:
''')
if CONSTANTS.ARCHIVE_DIR.exists():
if os.access(CONSTANTS.ARCHIVE_DIR, os.R_OK) and CONSTANTS.ARCHIVE_DIR.is_dir():
pretty_out_dir = str(out_dir).replace(str(Path('~').expanduser()), '~')
EXAMPLE_USAGE = f'''
[light_slate_blue]DATA DIR[/light_slate_blue]: [yellow]{pretty_out_dir}[/yellow]
@ -264,7 +264,7 @@ def version(quiet: bool=False,
prnt(printable_folder_status(name, path), overflow='ignore', crop=False)
prnt()
if CONSTANTS.ARCHIVE_DIR.exists() or CONSTANTS.CONFIG_FILE.exists():
if os.access(CONSTANTS.ARCHIVE_DIR, os.R_OK) or os.access(CONSTANTS.CONFIG_FILE, os.R_OK):
prnt('[bright_yellow][i] Data locations:[/bright_yellow]')
for name, path in CONSTANTS.DATA_LOCATIONS.items():
prnt(printable_folder_status(name, path), overflow='ignore', crop=False)
@ -331,11 +331,11 @@ def init(force: bool=False, quick: bool=False, install: bool=False, out_dir: Pat
out_dir.mkdir(exist_ok=True)
is_empty = not len(set(os.listdir(out_dir)) - CONSTANTS.ALLOWED_IN_DATA_DIR)
if (out_dir / CONSTANTS.JSON_INDEX_FILENAME).exists():
if os.access(out_dir / CONSTANTS.JSON_INDEX_FILENAME, os.F_OK):
print("[red]:warning: This folder contains a JSON index. It is deprecated, and will no longer be kept up to date automatically.[/red]", file=sys.stderr)
print("[red] You can run `archivebox list --json --with-headers > static_index.json` to manually generate it.[/red]", file=sys.stderr)
existing_index = CONSTANTS.DATABASE_FILE.exists()
existing_index = os.access(CONSTANTS.DATABASE_FILE, os.F_OK)
if is_empty and not existing_index:
print(f'[turquoise4][+] Initializing a new ArchiveBox v{VERSION} collection...[/turquoise4]')
@ -371,7 +371,7 @@ def init(force: bool=False, quick: bool=False, install: bool=False, out_dir: Pat
print(f' + ./{CONSTANTS.CONFIG_FILE.relative_to(DATA_DIR)}...')
write_config_file({}, out_dir=str(out_dir))
if CONSTANTS.DATABASE_FILE.exists():
if os.access(CONSTANTS.DATABASE_FILE, os.F_OK):
print('\n[green][*] Verifying main SQL index and running any migrations needed...[/green]')
else:
print('\n[green][+] Building main SQL index and running initial migrations...[/green]')
@ -379,7 +379,7 @@ def init(force: bool=False, quick: bool=False, install: bool=False, out_dir: Pat
for migration_line in apply_migrations(out_dir):
sys.stdout.write(f' {migration_line}\n')
assert CONSTANTS.DATABASE_FILE.exists()
assert os.access(CONSTANTS.DATABASE_FILE, os.R_OK)
print()
print(f' √ ./{CONSTANTS.DATABASE_FILE.relative_to(DATA_DIR)}')
@ -469,9 +469,9 @@ def init(force: bool=False, quick: bool=False, install: bool=False, out_dir: Pat
json_index = out_dir / CONSTANTS.JSON_INDEX_FILENAME
html_index = out_dir / CONSTANTS.HTML_INDEX_FILENAME
index_name = f"{date.today()}_index_old"
if json_index.exists():
if os.access(json_index, os.F_OK):
json_index.rename(f"{index_name}.json")
if html_index.exists():
if os.access(html_index, os.F_OK):
html_index.rename(f"{index_name}.html")
if install:
@ -1007,7 +1007,7 @@ def install(out_dir: Path=DATA_DIR) -> None:
from archivebox import CONSTANTS
from archivebox.config.permissions import IS_ROOT, ARCHIVEBOX_USER, ARCHIVEBOX_GROUP
if not ARCHIVE_DIR.exists():
if not (os.access(ARCHIVE_DIR, os.R_OK) and ARCHIVE_DIR.is_dir()):
run_subcommand('init', stdin=None, pwd=out_dir) # must init full index because we need a db to store InstalledBinary entries in
print('\n[green][+] Installing ArchiveBox dependencies automatically...[/green]')

View file

@ -1,5 +1,6 @@
__package__ = 'archivebox.misc'
import os
import sys
from rich import print
@ -14,7 +15,7 @@ from rich import print
def check_data_folder() -> None:
from archivebox import DATA_DIR, ARCHIVE_DIR
archive_dir_exists = ARCHIVE_DIR.exists()
archive_dir_exists = os.access(ARCHIVE_DIR, os.R_OK) and ARCHIVE_DIR.is_dir()
if not archive_dir_exists:
print('[red][X] No archivebox index found in the current directory.[/red]', file=sys.stderr)
print(f' {DATA_DIR}', file=sys.stderr)

View file

@ -114,7 +114,7 @@ def chmod_file(path: str, cwd: str='') -> None:
"""chmod -R <permissions> <cwd>/<path>"""
root = Path(cwd or os.getcwd()) / path
if not root.exists():
if not os.access(root, os.R_OK):
raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
if not root.is_dir():
@ -132,6 +132,9 @@ def chmod_file(path: str, cwd: str='') -> None:
@enforce_types
def copy_and_overwrite(from_path: Union[str, Path], to_path: Union[str, Path]):
"""copy a given file or directory to a given path, overwriting the destination"""
assert os.access(from_path, os.R_OK)
if Path(from_path).is_dir():
shutil.rmtree(to_path, ignore_errors=True)
shutil.copytree(from_path, to_path)

View file

@ -149,12 +149,13 @@ def save_text_as_source(raw_text: str, filename: str='{ts}-stdin.txt', out_dir:
referenced_texts = ''
for entry in raw_text.split():
try:
if Path(entry).exists():
referenced_texts += Path(entry).read_text()
except Exception as err:
print(err)
# dont attempt to read local files from the text, security risk:
# for entry in raw_text.split():
# try:
# if Path(entry).exists():
# referenced_texts += Path(entry).read_text()
# except Exception as err:
# print(err)
atomic_write(source_path, raw_text + '\n' + referenced_texts)
log_source_saved(source_file=source_path)

View file

@ -3,7 +3,6 @@ __description__ = 'Plain Text'
from typing import IO, Iterable
from datetime import datetime, timezone
from pathlib import Path
from ..index.schema import Link
from archivebox.misc.util import (
@ -22,19 +21,20 @@ def parse_generic_txt_export(text_file: IO[str], **_kwargs) -> Iterable[Link]:
if not line.strip():
continue
# if the line is a local file path that resolves, then we can archive it
try:
if Path(line).exists():
yield Link(
url=line,
timestamp=str(datetime.now(timezone.utc).timestamp()),
title=None,
tags=None,
sources=[text_file.name],
)
except (OSError, PermissionError):
# nvm, not a valid path...
pass
# # if the line is a local file path that resolves, then we can archive it
# if line.startswith('file://'):
# try:
# if Path(line).exists():
# yield Link(
# url=line,
# timestamp=str(datetime.now(timezone.utc).timestamp()),
# title=None,
# tags=None,
# sources=[text_file.name],
# )
# except (OSError, PermissionError):
# # nvm, not a valid path...
# pass
# otherwise look for anything that looks like a URL in the line
for url in find_all_urls(line):

View file

@ -1,5 +1,6 @@
__package__ = 'archivebox.plugins_extractor.chrome'
import os
import sys
import platform
from pathlib import Path
@ -130,9 +131,9 @@ class ChromeConfig(BaseConfigSet):
print(file=sys.stderr)
# if user has specified a user data dir, make sure its valid
if self.CHROME_USER_DATA_DIR and self.CHROME_USER_DATA_DIR.exists():
if self.CHROME_USER_DATA_DIR and os.access(self.CHROME_USER_DATA_DIR, os.R_OK):
# check to make sure user_data_dir/<profile_name> exists
if not (self.CHROME_USER_DATA_DIR / self.CHROME_PROFILE_NAME).exists():
if not (self.CHROME_USER_DATA_DIR / self.CHROME_PROFILE_NAME).is_dir():
print(f'[red][X] Could not find profile "{self.CHROME_PROFILE_NAME}" in CHROME_USER_DATA_DIR.[/red]', file=sys.stderr)
print(f' {self.CHROME_USER_DATA_DIR}', file=sys.stderr)
print(' Make sure you set it to a Chrome user data directory containing a Default profile folder.', file=sys.stderr)
@ -217,7 +218,7 @@ class ChromeBinary(BaseBinary):
@staticmethod
def symlink_to_lib(binary, bin_dir=CONSTANTS.LIB_BIN_DIR) -> None:
if not (binary.abspath and binary.abspath.exists()):
if not (binary.abspath and os.access(binary.abspath, os.F_OK)):
return
bin_dir.mkdir(parents=True, exist_ok=True)
@ -242,10 +243,14 @@ class ChromeBinary(BaseBinary):
Cleans up any state or runtime files that chrome leaves behind when killed by
a timeout or other error
"""
lock_file = Path("~/.config/chromium/SingletonLock")
lock_file = Path("~/.config/chromium/SingletonLock").expanduser()
if SHELL_CONFIG.IN_DOCKER and lock_file.exists():
if SHELL_CONFIG.IN_DOCKER and os.access(lock_file, os.F_OK):
lock_file.unlink()
if CHROME_CONFIG.CHROME_USER_DATA_DIR:
if os.access(CHROME_CONFIG.CHROME_USER_DATA_DIR / 'SingletonLock', os.F_OK):
lock_file.unlink()

View file

@ -1,6 +1,7 @@
__package__ = 'archivebox.extractors'
import re
import os
from pathlib import Path
from typing import Optional
@ -157,12 +158,12 @@ def wget_output_path(link, nocache: bool=False) -> Optional[str]:
# fallback to just the domain dir
search_dir = Path(link.link_dir) / domain(link.url).replace(":", "+")
if search_dir.is_dir():
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
return domain(link.url).replace(":", "+")
# fallback to just the domain dir without port
search_dir = Path(link.link_dir) / domain(link.url).split(":", 1)[0]
if search_dir.is_dir():
if os.access(search_dir, os.R_OK) and search_dir.is_dir():
return domain(link.url).split(":", 1)[0]
return None

View file

@ -41,7 +41,7 @@ wheels = [
[[package]]
name = "archivebox"
version = "0.8.5rc7"
version = "0.8.5rc8"
source = { editable = "." }
dependencies = [
{ name = "atomicwrites" },