mirror of
synced 2025-02-16 22:08:25 +00:00
130 lines
7.2 KiB
130 lines
7.2 KiB
import logging, math, re, requests, time
from lxml import html
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
class IMDbAPI:
def __init__(self, Cache=None, TMDb=None, Trakt=None, TVDb=None):
if TMDb is None and Trakt is None:
raise Failed("IMDb Error: IMDb requires either TMDb or Trakt")
self.Cache = Cache
self.TMDb = TMDb
self.Trakt = Trakt
self.TVDb = TVDb
def get_imdb_ids_from_url(self, imdb_url, language, limit):
imdb_url = imdb_url.strip()
if not imdb_url.startswith("https://www.imdb.com/list/ls") and not imdb_url.startswith("https://www.imdb.com/search/title/?"):
raise Failed("IMDb Error: {} must begin with either:\n| https://www.imdb.com/list/ls (For Lists)\n| https://www.imdb.com/search/title/? (For Searches)".format(imdb_url))
if imdb_url.startswith("https://www.imdb.com/list/ls"):
try: list_id = re.search("(\\d+)", str(imdb_url)).group(1)
except AttributeError: raise Failed("IMDb Error: Failed to parse List ID from {}".format(imdb_url))
current_url = "https://www.imdb.com/search/title/?lists=ls{}".format(list_id)
current_url = imdb_url
header = {"Accept-Language": language}
length = 0
imdb_ids = []
try: results = self.send_request(current_url, header).xpath("//div[@class='desc']/span/text()")[0].replace(",", "")
except IndexError: raise Failed("IMDb Error: Failed to parse URL: {}".format(imdb_url))
try: total = int(re.findall("(\\d+) title", results)[0])
except IndexError: raise Failed("IMDb Error: No Results at URL: {}".format(imdb_url))
if "&start=" in current_url: current_url = re.sub("&start=\d+", "", current_url)
if "&count=" in current_url: current_url = re.sub("&count=\d+", "", current_url)
if limit < 1 or total < limit: limit = total
remainder = limit % 250
if remainder == 0: remainder = 250
num_of_pages = math.ceil(int(limit) / 250)
for i in range(1, num_of_pages + 1):
start_num = (i - 1) * 250 + 1
length = util.print_return(length, "Parsing Page {}/{} {}-{}".format(i, num_of_pages, start_num, limit if i == num_of_pages else i * 250))
response = self.send_request("{}&count={}&start={}".format(current_url, remainder if i == num_of_pages else 250, start_num), header)
imdb_ids.extend(response.xpath("//div[contains(@class, 'lister-item-image')]//a/img//@data-tconst"))
if imdb_ids: return imdb_ids
else: raise Failed("IMDb Error: No Movies Found at {}".format(imdb_url))
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url, header):
return html.fromstring(requests.get(url, headers=header).content)
def get_items(self, method, data, language, status_message=True):
pretty = util.pretty_names[method] if method in util.pretty_names else method
if status_message:
logger.debug("Data: {}".format(data))
show_ids = []
movie_ids = []
if method == "imdb_id":
if status_message:
logger.info("Processing {}: {}".format(pretty, data))
tmdb_id, tvdb_id = self.convert_from_imdb(data, language)
if tmdb_id: movie_ids.append(tmdb_id)
if tvdb_id: show_ids.append(tvdb_id)
elif method == "imdb_list":
if status_message:
logger.info("Processing {}: {}".format(pretty,"{} Items at {}".format(data["limit"], data["url"]) if data["limit"] > 0 else data["url"]))
imdb_ids = self.get_imdb_ids_from_url(data["url"], language, data["limit"])
total_ids = len(imdb_ids)
length = 0
for i, imdb_id in enumerate(imdb_ids, 1):
length = util.print_return(length, "Converting IMDb ID {}/{}".format(i, total_ids))
tmdb_id, tvdb_id = self.convert_from_imdb(imdb_id, language)
if tmdb_id: movie_ids.append(tmdb_id)
if tvdb_id: show_ids.append(tvdb_id)
except Failed as e: logger.warning(e)
util.print_end(length, "Processed {} IMDb IDs".format(total_ids))
raise Failed("IMDb Error: Method {} not supported".format(method))
if status_message:
logger.debug("TMDb IDs Found: {}".format(movie_ids))
logger.debug("TVDb IDs Found: {}".format(show_ids))
return movie_ids, show_ids
def convert_from_imdb(self, imdb_id, language):
if self.Cache:
tmdb_id, tvdb_id = self.Cache.get_ids_from_imdb(imdb_id)
update_tmdb = False
if not tmdb_id:
tmdb_id, update_tmdb = self.Cache.get_tmdb_from_imdb(imdb_id)
if update_tmdb:
tmdb_id = None
update_tvdb = False
if not tvdb_id:
tvdb_id, update_tvdb = self.Cache.get_tvdb_from_imdb(imdb_id)
if update_tvdb:
tvdb_id = None
tmdb_id = None
tvdb_id = None
from_cache = tmdb_id is not None or tvdb_id is not None
if not tmdb_id and not tvdb_id and self.TMDb:
try: tmdb_id = self.TMDb.convert_imdb_to_tmdb(imdb_id)
except Failed: pass
if not tmdb_id and not tvdb_id and self.TMDb:
try: tvdb_id = self.TMDb.convert_imdb_to_tvdb(imdb_id)
except Failed: pass
if not tmdb_id and not tvdb_id and self.Trakt:
try: tmdb_id = self.Trakt.convert_imdb_to_tmdb(imdb_id)
except Failed: pass
if not tmdb_id and not tvdb_id and self.Trakt:
try: tvdb_id = self.Trakt.convert_imdb_to_tvdb(imdb_id)
except Failed: pass
if tmdb_id and not from_cache: self.TMDb.get_movie(tmdb_id)
except Failed: tmdb_id = None
if tvdb_id and not from_cache: self.TVDb.get_series(language, tvdb_id=tvdb_id)
except Failed: tvdb_id = None
if not tmdb_id and not tvdb_id : raise Failed("IMDb Error: No TMDb ID or TVDb ID found for IMDb: {}".format(imdb_id))
if self.Cache:
if tmdb_id and update_tmdb is not False:
self.Cache.update_imdb("movie", update_tmdb, imdb_id, tmdb_id)
if tvdb_id and update_tvdb is not False:
self.Cache.update_imdb("show", update_tvdb, imdb_id, tvdb_id)
return tmdb_id, tvdb_id