[68] Cache TVDb Objects

This commit is contained in:
meisnate12 2022-05-05 18:05:16 -04:00
parent 6e7d0f2112
commit 62ad5594f9
6 changed files with 236 additions and 121 deletions

View file

@ -1 +1 @@
1.16.5-develop67
1.16.5-develop68

View file

@ -832,7 +832,7 @@ class CollectionBuilder:
elif method_name == "tmdb_biography":
self.summaries[method_name] = self.config.TMDb.get_person(util.regex_first_int(method_data, "TMDb Person ID")).biography
elif method_name == "tvdb_summary":
self.summaries[method_name] = self.config.TVDb.get_item(method_data, self.library.is_movie).summary
self.summaries[method_name] = self.config.TVDb.get_tvdb_obj(method_data, is_movie=self.library.is_movie).summary
elif method_name == "tvdb_description":
self.summaries[method_name] = self.config.TVDb.get_list_description(method_data)
elif method_name == "trakt_description":
@ -850,7 +850,7 @@ class CollectionBuilder:
elif method_name == "tmdb_profile":
self.posters[method_name] = self.config.TMDb.get_person(util.regex_first_int(method_data, 'TMDb Person ID')).profile_url
elif method_name == "tvdb_poster":
self.posters[method_name] = f"{self.config.TVDb.get_item(method_data, self.library.is_movie).poster_path}"
self.posters[method_name] = f"{self.config.TVDb.get_tvdb_obj(method_data, is_movie=self.library.is_movie).poster_url}"
elif method_name == "file_poster":
if os.path.exists(os.path.abspath(method_data)):
self.posters[method_name] = os.path.abspath(method_data)
@ -863,7 +863,7 @@ class CollectionBuilder:
elif method_name == "tmdb_background":
self.backgrounds[method_name] = self.config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, 'TMDb ID'), self.library.is_movie).backdrop_url
elif method_name == "tvdb_background":
self.posters[method_name] = f"{self.config.TVDb.get_item(method_data, self.library.is_movie).background_path}"
self.posters[method_name] = f"{self.config.TVDb.get_tvdb_obj(method_data, is_movie=self.library.is_movie).background_url}"
elif method_name == "file_background":
if os.path.exists(os.path.abspath(method_data)):
self.backgrounds[method_name] = os.path.abspath(method_data)
@ -1478,11 +1478,11 @@ class CollectionBuilder:
values = util.get_list(method_data)
if method_name.endswith("_details"):
if method_name.startswith(("tvdb_movie", "tvdb_show")):
item = self.config.TVDb.get_item(values[0], method_name.startswith("tvdb_movie"))
if item.background_path:
self.backgrounds[method_name] = item.background_path
if item.poster_path:
self.posters[method_name] = item.poster_path
item = self.config.TVDb.get_tvdb_obj(values[0], is_movie=method_name.startswith("tvdb_movie"))
if item.background_url:
self.backgrounds[method_name] = item.background_url
if item.poster_url:
self.posters[method_name] = item.poster_url
elif method_name.startswith("tvdb_list"):
self.summaries[method_name] = self.config.TVDb.get_list_description(values[0])
for value in values:
@ -2293,17 +2293,17 @@ class CollectionBuilder:
missing_shows_with_names = []
for missing_id in self.missing_shows:
try:
show = self.config.TVDb.get_series(missing_id)
title = self.config.TVDb.get_tvdb_obj(missing_id).title
except Failed as e:
logger.error(e)
continue
if self.check_tmdb_filter(missing_id, False, check_released=self.details["missing_only_released"]):
missing_shows_with_names.append((show.title, missing_id))
missing_shows_with_names.append((title, missing_id))
if self.details["show_missing"] is True:
logger.info(f"{self.name} {self.Type} | ? | {show.title} (TVDb: {missing_id})")
logger.info(f"{self.name} {self.Type} | ? | {title} (TVDb: {missing_id})")
else:
if self.details["show_filtered"] is True and self.details["show_missing"] is True:
logger.info(f"{self.name} {self.Type} | X | {show.title} (TVDb: {missing_id})")
logger.info(f"{self.name} {self.Type} | X | {title} (TVDb: {missing_id})")
logger.info("")
logger.info(f"{len(missing_shows_with_names)} Show{'s' if len(missing_shows_with_names) > 1 else ''} Missing")
if len(missing_shows_with_names) > 0:
@ -2791,7 +2791,7 @@ class CollectionBuilder:
for missing_id in self.run_again_shows:
if missing_id not in self.library.show_map:
try:
title = self.config.TVDb.get_series(missing_id).title
title = self.config.TVDb.get_tvdb_obj(missing_id).title
except Failed as e:
logger.error(e)
continue

View file

@ -163,6 +163,26 @@ class Cache:
seasons TEXT,
expiration_date TEXT)"""
)
cursor.execute(
"""CREATE TABLE IF NOT EXISTS tvdb_data (
key INTEGER PRIMARY KEY,
tvdb_id INTEGER UNIQUE,
type TEXT,
title TEXT,
summary TEXT,
poster_url TEXT,
background_url TEXT,
release_date TEXT,
genres TEXT,
expiration_date TEXT)"""
)
cursor.execute(
"""CREATE TABLE IF NOT EXISTS tvdb_map (
key INTEGER PRIMARY KEY,
tvdb_url TEXT UNIQUE,
tvdb_id INTEGER,
expiration_date TEXT)"""
)
cursor.execute(
"""CREATE TABLE IF NOT EXISTS anime_map (
key INTEGER PRIMARY KEY,
@ -540,6 +560,64 @@ class Cache:
expiration_date.strftime("%Y-%m-%d"), obj.tmdb_id
))
def query_tvdb(self, tvdb_id, is_movie, expiration):
tvdb_dict = {}
expired = None
with sqlite3.connect(self.cache_path) as connection:
connection.row_factory = sqlite3.Row
with closing(connection.cursor()) as cursor:
cursor.execute("SELECT * FROM tvdb_data WHERE tvdb_id = ? and type = ?", (tvdb_id, "movie" if is_movie else "show"))
row = cursor.fetchone()
if row:
tvdb_dict["tvdb_id"] = int(row["tvdb_id"]) if row["tvdb_id"] else 0
tvdb_dict["type"] = row["type"] if row["type"] else ""
tvdb_dict["title"] = row["title"] if row["title"] else ""
tvdb_dict["summary"] = row["summary"] if row["summary"] else ""
tvdb_dict["poster_url"] = row["poster_url"] if row["poster_url"] else ""
tvdb_dict["background_url"] = row["background_url"] if row["background_url"] else ""
tvdb_dict["release_date"] = datetime.strptime(row["release_date"], "%Y-%m-%d") if row["release_date"] else None
tvdb_dict["genres"] = row["genres"] if row["genres"] else ""
datetime_object = datetime.strptime(row["expiration_date"], "%Y-%m-%d")
time_between_insertion = datetime.now() - datetime_object
expired = time_between_insertion.days > expiration
return tvdb_dict, expired
def update_tvdb(self, expired, obj, expiration):
expiration_date = datetime.now() if expired is True else (datetime.now() - timedelta(days=random.randint(1, expiration)))
with sqlite3.connect(self.cache_path) as connection:
connection.row_factory = sqlite3.Row
with closing(connection.cursor()) as cursor:
cursor.execute("INSERT OR IGNORE INTO tvdb_data(tvdb_id, type) VALUES(?, ?)", (obj.tvdb_id, "movie" if obj.is_movie else "show"))
update_sql = "UPDATE tvdb_data SET title = ?, summary = ?, poster_url = ?, background_url = ?, " \
"release_date = ?, genres = ?, expiration_date = ? WHERE tvdb_id = ? AND type = ?"
cursor.execute(update_sql, (
obj.title, obj.summary, obj.poster_url, obj.background_url, obj.release_date.strftime("%Y-%m-%d") if obj.release_date else None,
"|".join(obj.genres), expiration_date.strftime("%Y-%m-%d"), obj.tvdb_id, "movie" if obj.is_movie else "show"
))
def query_tvdb_map(self, tvdb_url, expiration):
tvdb_id = None
expired = None
with sqlite3.connect(self.cache_path) as connection:
connection.row_factory = sqlite3.Row
with closing(connection.cursor()) as cursor:
cursor.execute("SELECT * FROM tvdb_map WHERE tvdb_url = ?", (tvdb_url, ))
row = cursor.fetchone()
if row:
tvdb_id = int(row["tvdb_id"]) if row["tvdb_id"] else None
datetime_object = datetime.strptime(row["expiration_date"], "%Y-%m-%d")
time_between_insertion = datetime.now() - datetime_object
expired = time_between_insertion.days > expiration
return tvdb_id, expired
def update_tvdb_map(self, expired, tvdb_url, tvdb_id, expiration):
expiration_date = datetime.now() if expired is True else (datetime.now() - timedelta(days=random.randint(1, expiration)))
with sqlite3.connect(self.cache_path) as connection:
connection.row_factory = sqlite3.Row
with closing(connection.cursor()) as cursor:
cursor.execute("INSERT OR IGNORE INTO tvdb_map(tvdb_url) VALUES(?)", (tvdb_url, ))
cursor.execute("UPDATE tvdb_map SET tvdb_id = ?, expiration_date = ? WHERE tvdb_url = ?", (tvdb_id, expiration_date.strftime("%Y-%m-%d"), tvdb_url))
def query_anime_map(self, anime_id, id_type):
ids = None
expired = None

View file

@ -501,7 +501,7 @@ class ConfigFile:
else:
logger.warning("playlist_files attribute not found")
self.TVDb = TVDb(self, self.general["tvdb_language"])
self.TVDb = TVDb(self, self.general["tvdb_language"], self.general["cache_expiration"])
self.IMDb = IMDb(self)
self.Convert = Convert(self)
self.AniList = AniList(self)

View file

@ -199,7 +199,7 @@ class Operations:
if any([o == "tvdb" for o in self.library.meta_operations]):
if tvdb_id:
try:
tvdb_item = self.config.TVDb.get_item(tvdb_id, self.library.is_movie)
tvdb_item = self.config.TVDb.get_tvdb_obj(tvdb_id, is_movie=self.library.is_movie)
except Failed as e:
logger.error(str(e))
else:
@ -356,7 +356,7 @@ class Operations:
elif mdb_item and self.library.mass_originally_available_update == "mdb":
new_date = mdb_item.released
elif tvdb_item and self.library.mass_originally_available_update == "tvdb":
new_date = tvdb_item.released
new_date = tvdb_item.release_date
elif tmdb_item and self.library.mass_originally_available_update == "tmdb":
new_date = tmdb_item.release_date if self.library.is_movie else tmdb_item.first_air_date
elif anidb_item and self.library.mass_originally_available_update == "anidb":

View file

@ -3,6 +3,7 @@ from datetime import datetime
from lxml.etree import ParserError
from modules import util
from modules.util import Failed
from retrying import retry
logger = util.logger
@ -39,36 +40,20 @@ language_translation = {
"yo": "yor", "za": "zha", "zu": "zul"}
class TVDbObj:
def __init__(self, tvdb_url, language, is_movie, config):
self.tvdb_url = tvdb_url.strip()
self.language = language
def __init__(self, tvdb, tvdb_id, is_movie=False, ignore_cache=False):
self._tvdb = tvdb
self.tvdb_id = tvdb_id
self.is_movie = is_movie
self.config = config
if not self.is_movie and self.tvdb_url.startswith((urls["series"], urls["alt_series"], urls["series_id"])):
self.media_type = "Series"
elif self.is_movie and self.tvdb_url.startswith((urls["movies"], urls["alt_movies"], urls["movie_id"])):
self.media_type = "Movie"
else:
raise Failed(f"TVDb Error: {self.tvdb_url} must begin with {urls['movies'] if self.is_movie else urls['series']}")
if self.config.trace_mode:
logger.debug(f"URL: {tvdb_url}")
try:
response = self.config.get_html(self.tvdb_url, headers=util.header(self.language))
except ParserError:
raise Failed(f"TVDb Error: Could not parse {self.tvdb_url}")
results = response.xpath(f"//*[text()='TheTVDB.com {self.media_type} ID']/parent::node()/span/text()")
if len(results) > 0:
self.id = int(results[0])
elif self.tvdb_url.startswith(urls["movie_id"]):
raise Failed(f"TVDb Error: Could not find a TVDb Movie using TVDb Movie ID: {self.tvdb_url[len(urls['movie_id']):]}")
elif self.tvdb_url.startswith(urls["series_id"]):
raise Failed(f"TVDb Error: Could not find a TVDb Series using TVDb Series ID: {self.tvdb_url[len(urls['series_id']):]}")
else:
raise Failed(f"TVDb Error: Could not find a TVDb {self.media_type} ID at the URL {self.tvdb_url}")
self.ignore_cache = ignore_cache
expired = None
data = None
if self._tvdb.config.Cache and not ignore_cache:
data, expired = self._tvdb.config.Cache.query_tvdb(tvdb_id, is_movie, self._tvdb.expiration)
if expired or not data:
data = self._tvdb.get_request(f"{urls['movie_id' if is_movie else 'series_id']}{tvdb_id}")
def parse_page(xpath, is_list=False):
parse_results = response.xpath(xpath)
parse_results = data.xpath(xpath)
if len(parse_results) > 0:
parse_results = [r.strip() for r in parse_results if len(r) > 0]
return parse_results if is_list else parse_results[0] if len(parse_results) > 0 else None
@ -78,78 +63,116 @@ class TVDbObj:
place += f"@data-language='{lang}']" if lang else "not(@style='display:none')]"
return parse_page(f"{place}/@data-title"), parse_page(f"{place}/p/text()[normalize-space()]")
self.title, self.summary = parse_title_summary(lang=self.language)
if not self.title and self.language in language_translation:
self.title, self.summary = parse_title_summary(lang=language_translation[self.language])
if not self.title:
self.title, self.summary = parse_title_summary()
if not self.title:
raise Failed(f"TVDb Error: Name not found from TVDb URL: {self.tvdb_url}")
self.poster_path = parse_page("(//h2[@class='mt-4' and text()='Posters']/following::div/a/@href)[1]")
self.background_path = parse_page("(//h2[@class='mt-4' and text()='Backgrounds']/following::div/a/@href)[1]")
if self.is_movie:
self.directors = parse_page("//strong[text()='Directors']/parent::li/span/a/text()[normalize-space()]", is_list=True)
self.writers = parse_page("//strong[text()='Writers']/parent::li/span/a/text()[normalize-space()]", is_list=True)
self.studios = parse_page("//strong[text()='Studio']/parent::li/span/a/text()[normalize-space()]", is_list=True)
released = parse_page("//strong[text()='Released']/parent::li/span/text()[normalize-space()]")
if isinstance(data, dict):
self.title = data["title"]
self.summary = data["summary"]
self.poster_url = data["poster_url"]
self.background_url = data["background_url"]
self.release_date = data["release_date"]
self.genres = data["genres"].split("|")
else:
self.networks = parse_page("//strong[text()='Networks']/parent::li/span/a/text()[normalize-space()]", is_list=True)
released = parse_page("//strong[text()='First Aired']/parent::li/span/text()[normalize-space()]")
try:
self.released = datetime.strptime(released, "%B %d, %Y") if released else released
except ValueError:
self.released = None
self.title, self.summary = parse_title_summary(lang=self._tvdb.language)
if not self.title and self._tvdb.language in language_translation:
self.title, self.summary = parse_title_summary(lang=language_translation[self._tvdb.language])
if not self.title:
self.title, self.summary = parse_title_summary()
if not self.title:
raise Failed(f"TVDb Error: Name not found from TVDb ID: {self.tvdb_id}")
self.genres = parse_page("//strong[text()='Genres']/parent::li/span/a/text()[normalize-space()]", is_list=True)
self.poster_url = parse_page("(//h2[@class='mt-4' and text()='Posters']/following::div/a/@href)[1]")
self.background_url = parse_page("(//h2[@class='mt-4' and text()='Backgrounds']/following::div/a/@href)[1]")
if is_movie:
released = parse_page("//strong[text()='Released']/parent::li/span/text()[normalize-space()]")
else:
released = parse_page("//strong[text()='First Aired']/parent::li/span/text()[normalize-space()]")
tmdb_id = None
imdb_id = None
if self.is_movie:
results = response.xpath("//*[text()='TheMovieDB.com']/@href")
if len(results) > 0:
try:
tmdb_id = util.regex_first_int(results[0], "TMDb ID")
except Failed:
pass
results = response.xpath("//*[text()='IMDB']/@href")
if len(results) > 0:
try:
imdb_id = util.get_id_from_imdb_url(results[0])
except Failed:
pass
if tmdb_id is None and imdb_id is None:
raise Failed(f"TVDb Error: No TMDb ID or IMDb ID found for {self.title}")
self.tmdb_id = tmdb_id
self.imdb_id = imdb_id
try:
self.release_date = datetime.strptime(released, "%B %d, %Y") if released else released
except ValueError:
self.release_date = None
self.genres = parse_page("//strong[text()='Genres']/parent::li/span/a/text()[normalize-space()]", is_list=True)
if self._tvdb.config.Cache and not ignore_cache:
self._tvdb.config.Cache.update_tvdb(expired, self, self._tvdb.expiration)
class TVDb:
def __init__(self, config, tvdb_language):
def __init__(self, config, tvdb_language, expiration):
self.config = config
self.tvdb_language = tvdb_language
self.language = tvdb_language
self.expiration = expiration
def get_item(self, tvdb_url, is_movie):
return self.get_movie(tvdb_url) if is_movie else self.get_series(tvdb_url)
def get_series(self, tvdb_url):
try:
tvdb_url = f"{urls['series_id']}{int(tvdb_url)}"
except ValueError:
pass
return TVDbObj(tvdb_url, self.tvdb_language, False, self.config)
def get_movie(self, tvdb_url):
try:
tvdb_url = f"{urls['movie_id']}{int(tvdb_url)}"
except ValueError:
pass
return TVDbObj(tvdb_url, self.tvdb_language, True, self.config)
def get_tvdb_obj(self, tvdb_url, is_movie=False):
tvdb_id, _, _ = self.get_id_from_url(tvdb_url, is_movie=is_movie)
return TVDbObj(self, tvdb_id, is_movie=is_movie)
def get_list_description(self, tvdb_url):
response = self.config.get_html(tvdb_url, headers=util.header(self.tvdb_language))
response = self.config.get_html(tvdb_url, headers=util.header(self.language))
description = response.xpath("//div[@class='block']/div[not(@style='display:none')]/p/text()")
return description[0] if len(description) > 0 and len(description[0]) > 0 else ""
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_request(self, tvdb_url):
return self.config.get_html(tvdb_url, headers=util.header(self.language))
def get_id_from_url(self, tvdb_url, is_movie=False, ignore_cache=False):
try:
if not is_movie:
return int(tvdb_url), None, None
else:
tvdb_url = f"{urls['movie_id']}{int(tvdb_url)}"
except ValueError:
pass
tvdb_url = tvdb_url.strip()
if tvdb_url.startswith((urls["series"], urls["alt_series"], urls["series_id"])):
media_type = "Series"
elif tvdb_url.startswith((urls["movies"], urls["alt_movies"], urls["movie_id"])):
media_type = "Movie"
else:
raise Failed(f"TVDb Error: {tvdb_url} must begin with {urls['movies']} or {urls['series']}")
expired = None
tvdb_id = None
if self.config.Cache and not ignore_cache:
tvdb_id, expired = self.config.Cache.query_tvdb_map(tvdb_url, self.expiration)
if tvdb_id and not expired and not is_movie:
return tvdb_id, None, None
if self.config.trace_mode:
logger.debug(f"URL: {tvdb_url}")
try:
response = self.get_request(tvdb_url)
except ParserError:
raise Failed(f"TVDb Error: Could not parse {tvdb_url}")
results = response.xpath(f"//*[text()='TheTVDB.com {media_type} ID']/parent::node()/span/text()")
if len(results) > 0:
tvdb_id = int(results[0])
tmdb_id = None
imdb_id = None
if media_type == "Movie":
results = response.xpath("//*[text()='TheMovieDB.com']/@href")
if len(results) > 0:
try:
tmdb_id = util.regex_first_int(results[0], "TMDb ID")
except Failed:
pass
results = response.xpath("//*[text()='IMDB']/@href")
if len(results) > 0:
try:
imdb_id = util.get_id_from_imdb_url(results[0])
except Failed:
pass
if tmdb_id is None and imdb_id is None:
raise Failed(f"TVDb Error: No TMDb ID or IMDb ID found")
if self.config.Cache and not ignore_cache:
self.config.Cache.update_tvdb_map(expired, tvdb_url, tvdb_id, self.expiration)
return tvdb_id, tmdb_id, imdb_id
elif tvdb_url.startswith(urls["movie_id"]):
err_text = f"using TVDb Movie ID: {tvdb_url[len(urls['movie_id']):]}"
elif tvdb_url.startswith(urls["series_id"]):
err_text = f"using TVDb Series ID: {tvdb_url[len(urls['series_id']):]}"
else:
err_text = f"ID at the URL {tvdb_url}"
raise Failed(f"TVDb Error: Could not find a TVDb {media_type} {err_text}")
def _ids_from_url(self, tvdb_url):
ids = []
tvdb_url = tvdb_url.strip()
@ -157,25 +180,27 @@ class TVDb:
logger.debug(f"URL: {tvdb_url}")
if tvdb_url.startswith((urls["list"], urls["alt_list"])):
try:
response = self.config.get_html(tvdb_url, headers=util.header(self.tvdb_language))
items = response.xpath("//div[@class='col-xs-12 col-sm-12 col-md-8 col-lg-8 col-md-pull-4']/div[@class='row']")
response = self.config.get_html(tvdb_url, headers=util.header(self.language))
items = response.xpath("//div[@class='row']/div/div[@class='row']/div/h3/a")
for item in items:
title = item.xpath(".//div[@class='col-xs-12 col-sm-9 mt-2']//a/text()")[0]
item_url = item.xpath(".//div[@class='col-xs-12 col-sm-9 mt-2']//a/@href")[0]
title = item.xpath("text()")[0]
item_url = item.xpath("@href")[0]
if item_url.startswith("/series/"):
try:
ids.append((self.get_series(f"{base_url}{item_url}").id, "tvdb"))
tvdb_id, _, _ = self.get_id_from_url(f"{base_url}{item_url}")
if tvdb_id:
ids.append((tvdb_id, "tvdb"))
except Failed as e:
logger.error(f"{e} for series {title}")
elif item_url.startswith("/movies/"):
try:
movie = self.get_movie(f"{base_url}{item_url}")
if movie.tmdb_id:
ids.append((movie.tmdb_id, "tmdb"))
elif movie.imdb_id:
ids.append((movie.imdb_id, "imdb"))
_, tmdb_id, imdb_id = self.get_id_from_url(f"{base_url}{item_url}")
if tmdb_id:
ids.append((tmdb_id, "tmdb"))
elif imdb_id:
ids.append((imdb_id, "imdb"))
except Failed as e:
logger.error(e)
logger.error(f"{e} for movie {title}")
else:
logger.error(f"TVDb Error: Skipping Movie: {title}")
time.sleep(2)
@ -191,14 +216,26 @@ class TVDb:
def get_tvdb_ids(self, method, data):
if method == "tvdb_show":
logger.info(f"Processing TVDb Show: {data}")
return [(self.get_series(data).id, "tvdb")]
ids = []
try:
tvdb_id, _, _ = self.get_id_from_url(data)
if tvdb_id:
ids.append((tvdb_id, "tvdb"))
except Failed as e:
logger.error(e)
return ids
elif method == "tvdb_movie":
logger.info(f"Processing TVDb Movie: {data}")
movie = self.get_movie(data)
if movie.tmdb_id:
return [(movie.tmdb_id, "tmdb")]
elif movie.imdb_id:
return [(movie.imdb_id, "imdb")]
ids = []
try:
_, tmdb_id, imdb_id = self.get_id_from_url(data)
if tmdb_id:
ids.append((tmdb_id, "tmdb"))
elif imdb_id:
ids.append((imdb_id, "imdb"))
except Failed as e:
logger.error(e)
return ids
elif method == "tvdb_list":
logger.info(f"Processing TVDb List: {data}")
return self._ids_from_url(data)