mirror of
https://github.com/ArchiveBox/ArchiveBox
synced 2024-11-22 20:23:12 +00:00
163 lines
5.5 KiB
Python
163 lines
5.5 KiB
Python
__package__ = 'archivebox.index'
|
|
|
|
import re
|
|
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Iterator
|
|
from django.db.models import QuerySet
|
|
from django.db import transaction
|
|
|
|
from archivebox.misc.util import enforce_types, parse_date
|
|
from archivebox.config import DATA_DIR
|
|
from archivebox.config.common import GENERAL_CONFIG
|
|
|
|
from .schema import Link
|
|
|
|
### Main Links Index
|
|
|
|
@enforce_types
|
|
def parse_sql_main_index(out_dir: Path=DATA_DIR) -> Iterator[Link]:
|
|
from core.models import Snapshot
|
|
|
|
return (
|
|
Link.from_json(page.as_json(*Snapshot.keys))
|
|
for page in Snapshot.objects.all()
|
|
)
|
|
|
|
@enforce_types
|
|
def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=DATA_DIR) -> None:
|
|
if atomic:
|
|
with transaction.atomic():
|
|
return snapshots.delete()
|
|
return snapshots.delete()
|
|
|
|
@enforce_types
|
|
def write_link_to_sql_index(link: Link, created_by_id: int | None=None):
|
|
from core.models import Snapshot, ArchiveResult
|
|
from abid_utils.models import get_or_create_system_user_pk
|
|
|
|
info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
|
|
|
|
info['created_by_id'] = created_by_id or get_or_create_system_user_pk()
|
|
|
|
tag_list = list(dict.fromkeys(
|
|
tag.strip() for tag in re.split(GENERAL_CONFIG.TAG_SEPARATOR_PATTERN, link.tags or '')
|
|
))
|
|
info.pop('tags')
|
|
|
|
try:
|
|
snapshot = Snapshot.objects.get(url=link.url)
|
|
info["timestamp"] = snapshot.timestamp
|
|
except Snapshot.DoesNotExist:
|
|
while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
|
|
info["timestamp"] = str(float(info["timestamp"]) + 1.0)
|
|
|
|
snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
|
|
snapshot.save_tags(tag_list)
|
|
|
|
for extractor, entries in link.history.items():
|
|
for entry in entries:
|
|
if isinstance(entry, dict):
|
|
result, _ = ArchiveResult.objects.get_or_create(
|
|
snapshot_id=snapshot.pk,
|
|
extractor=extractor,
|
|
start_ts=parse_date(entry['start_ts']),
|
|
defaults={
|
|
'end_ts': parse_date(entry['end_ts']),
|
|
'cmd': entry['cmd'],
|
|
'output': entry['output'],
|
|
'cmd_version': entry.get('cmd_version') or 'unknown',
|
|
'pwd': entry['pwd'],
|
|
'status': entry['status'],
|
|
'created_by_id': snapshot.created_by_id,
|
|
}
|
|
)
|
|
else:
|
|
result, _ = ArchiveResult.objects.update_or_create(
|
|
snapshot_id=snapshot.pk,
|
|
extractor=extractor,
|
|
start_ts=parse_date(entry.start_ts),
|
|
defaults={
|
|
'end_ts': parse_date(entry.end_ts),
|
|
'cmd': entry.cmd,
|
|
'output': entry.output,
|
|
'cmd_version': entry.cmd_version or 'unknown',
|
|
'pwd': entry.pwd,
|
|
'status': entry.status,
|
|
'created_by_id': snapshot.created_by_id,
|
|
}
|
|
)
|
|
|
|
return snapshot
|
|
|
|
|
|
@enforce_types
|
|
def write_sql_main_index(links: List[Link], out_dir: Path=DATA_DIR, created_by_id: int | None=None) -> None:
|
|
for link in links:
|
|
# with transaction.atomic():
|
|
# write_link_to_sql_index(link)
|
|
write_link_to_sql_index(link, created_by_id=created_by_id)
|
|
|
|
|
|
@enforce_types
|
|
def write_sql_link_details(link: Link, out_dir: Path=DATA_DIR, created_by_id: int | None=None) -> None:
|
|
from core.models import Snapshot
|
|
|
|
# with transaction.atomic():
|
|
# try:
|
|
# snap = Snapshot.objects.get(url=link.url)
|
|
# except Snapshot.DoesNotExist:
|
|
# snap = write_link_to_sql_index(link)
|
|
# snap.title = link.title
|
|
try:
|
|
snap = Snapshot.objects.get(url=link.url)
|
|
except Snapshot.DoesNotExist:
|
|
snap = write_link_to_sql_index(link, created_by_id=created_by_id)
|
|
|
|
snap.title = link.title
|
|
|
|
tag_list = list(
|
|
{tag.strip() for tag in re.split(GENERAL_CONFIG.TAG_SEPARATOR_PATTERN, link.tags or '')}
|
|
| set(snap.tags.values_list('name', flat=True))
|
|
)
|
|
|
|
snap.save()
|
|
snap.save_tags(tag_list)
|
|
|
|
|
|
|
|
@enforce_types
|
|
def list_migrations(out_dir: Path=DATA_DIR) -> List[Tuple[bool, str]]:
|
|
from django.core.management import call_command
|
|
out = StringIO()
|
|
call_command("showmigrations", list=True, stdout=out)
|
|
out.seek(0)
|
|
migrations = []
|
|
for line in out.readlines():
|
|
if line.strip() and ']' in line:
|
|
status_str, name_str = line.strip().split(']', 1)
|
|
is_applied = 'X' in status_str
|
|
migration_name = name_str.strip()
|
|
migrations.append((is_applied, migration_name))
|
|
|
|
return migrations
|
|
|
|
@enforce_types
|
|
def apply_migrations(out_dir: Path=DATA_DIR) -> List[str]:
|
|
from django.core.management import call_command
|
|
out1, out2 = StringIO(), StringIO()
|
|
|
|
call_command("migrate", interactive=False, database='default', stdout=out1)
|
|
out1.seek(0)
|
|
call_command("migrate", "huey_monitor", interactive=False, database='queue', stdout=out2)
|
|
out2.seek(0)
|
|
|
|
return [
|
|
line.strip() for line in out1.readlines() + out2.readlines() if line.strip()
|
|
]
|
|
|
|
@enforce_types
|
|
def get_admins(out_dir: Path=DATA_DIR) -> List[str]:
|
|
from django.contrib.auth.models import User
|
|
return User.objects.filter(is_superuser=True).exclude(username='system')
|