Plex-Meta-Manager/modules/mdblist.py

164 lines
7.1 KiB
Python
Raw Normal View History

2022-01-15 22:40:59 +00:00
import logging
2022-01-24 09:16:45 +00:00
from modules import util
2022-01-15 22:40:59 +00:00
from modules.util import Failed
2022-01-25 07:45:31 +00:00
from urllib.parse import urlparse
2022-01-15 22:40:59 +00:00
logger = logging.getLogger("Plex Meta Manager")
builders = ["mdblist_list"]
2022-02-06 03:52:54 +00:00
list_sorts = ["score", "released", "updated", "imdbrating", "rogerebert", "imdbvotes", "budget", "revenue"]
2022-01-15 22:40:59 +00:00
base_url = "https://mdblist.com/lists"
2022-02-06 07:33:09 +00:00
api_url = "https://mdblist.com/api/"
2022-01-15 22:40:59 +00:00
2022-01-25 07:45:31 +00:00
headers = {"User-Agent": "Plex-Meta-Manager"}
2022-01-15 22:40:59 +00:00
2022-02-06 07:33:09 +00:00
class MDbObj:
def __init__(self, data):
self._data = data
self.title = data["title"]
self.year = util.check_num(data["year"])
self.type = data["type"]
self.imdbid = data["imdbid"]
self.traktid = util.check_num(data["traktid"])
self.tmdbid = util.check_num(data["tmdbid"])
self.score = util.check_num(data["score"])
self.imdb_rating = None
self.metacritic_rating = None
self.metacriticuser_rating = None
self.trakt_rating = None
self.tomatoes_rating = None
self.tomatoesaudience_rating = None
self.tmdb_rating = None
self.letterboxd_rating = None
for rating in data["ratings"]:
if rating["source"] == "imdb":
self.imdb_rating = util.check_num(rating["value"], is_int=False)
elif rating["source"] == "metacritic":
self.metacritic_rating = util.check_num(rating["value"])
elif rating["source"] == "metacriticuser":
self.metacriticuser_rating = util.check_num(rating["value"], is_int=False)
elif rating["source"] == "trakt":
self.trakt_rating = util.check_num(rating["value"])
elif rating["source"] == "tomatoes":
self.tomatoes_rating = util.check_num(rating["value"])
elif rating["source"] == "tomatoesaudience":
self.tomatoesaudience_rating = util.check_num(rating["value"])
elif rating["source"] == "tmdb":
self.tmdb_rating = util.check_num(rating["value"])
elif rating["source"] == "letterboxd":
self.letterboxd_rating = util.check_num(rating["value"], is_int=False)
self.commonsense = data["commonsense"]
2022-01-15 22:40:59 +00:00
class Mdblist:
def __init__(self, config):
self.config = config
2022-02-06 07:33:09 +00:00
self.apikey = None
self.limit = False
def add_key(self, apikey):
self.apikey = apikey
try:
self._request(imdb_id="tt0080684", ignore_cache=True)
except Failed:
self.apikey = None
raise
@property
def has_key(self):
return self.apikey is not None
def _request(self, imdb_id=None, tmdb_id=None, is_movie=True, ignore_cache=False):
params = {"apikey": self.apikey}
if imdb_id:
params["i"] = imdb_id
key = imdb_id
elif tmdb_id:
params["tm"] = tmdb_id
params["m"] = "movie" if is_movie else "show"
key = f"{'tm' if is_movie else 'ts'}{tmdb_id}"
else:
raise Failed("MdbList Error: Either IMDb ID or TMDb ID and TMDb Type Required")
expired = None
if self.config.Cache and not ignore_cache:
mdb_dict, expired = self.config.Cache.query_mdb(key)
if mdb_dict and expired is False:
return MDbObj(mdb_dict)
if self.config.trace_mode:
logger.debug(f"ID: {key}")
response = self.config.get_json(api_url, params=params)
if "response" in response and response["response"] is False:
if response["error"] == "API Limit Reached!":
self.limit = True
raise Failed(f"MdbList Error: {response['error']}")
else:
mdb = MDbObj(response)
if self.config.Cache and not ignore_cache:
self.config.Cache.update_mdb(expired, key, mdb)
return mdb
def get_imdb(self, imdb_id):
return self._request(imdb_id=imdb_id)
def get_series(self, tmdb_id):
return self._request(tmdb_id=tmdb_id, is_movie=False)
def get_movie(self, tmdb_id):
return self._request(tmdb_id=tmdb_id, is_movie=True)
2022-01-15 22:40:59 +00:00
2022-01-25 07:45:31 +00:00
def validate_mdblist_lists(self, mdb_lists):
2022-01-24 09:16:45 +00:00
valid_lists = []
for mdb_dict in util.get_list(mdb_lists, split=False):
if not isinstance(mdb_dict, dict):
mdb_dict = {"url": mdb_dict}
dict_methods = {dm.lower(): dm for dm in mdb_dict}
if "url" not in dict_methods:
raise Failed(f"Collection Error: mdb_list url attribute not found")
elif mdb_dict[dict_methods["url"]] is None:
raise Failed(f"Collection Error: mdb_list url attribute is blank")
else:
mdb_url = mdb_dict[dict_methods["url"]].strip()
if not mdb_url.startswith(base_url):
raise Failed(f"Mdblist Error: {mdb_url} must begin with: {base_url}")
list_count = None
if "limit" in dict_methods:
if mdb_dict[dict_methods["limit"]] is None:
logger.warning(f"Collection Warning: mdb_list limit attribute is blank using 0 as default")
else:
try:
value = int(str(mdb_dict[dict_methods["limit"]]))
if 0 <= value:
list_count = value
except ValueError:
pass
if list_count is None:
logger.warning(f"Collection Warning: mdb_list limit attribute must be an integer 0 or greater using 0 as default")
if list_count is None:
list_count = 0
2022-02-06 03:52:54 +00:00
sort_by = "score"
if "sort_by" in dict_methods:
if mdb_dict[dict_methods["sort_by"]] is None:
logger.warning(f"Collection Warning: mdb_list sort_by attribute is blank using score as default")
elif mdb_dict[dict_methods["sort_by"]].lower() not in list_sorts:
logger.warning(f"Collection Warning: mdb_list sort_by attribute {mdb_dict[dict_methods['sort_by']]} not valid score as default. Options: {', '.join(list_sorts)}")
else:
sort_by = mdb_dict[dict_methods["sort_by"]].lower()
valid_lists.append({"url": mdb_url, "limit": list_count, "sort_by": sort_by})
2022-01-24 09:16:45 +00:00
return valid_lists
2022-01-15 22:40:59 +00:00
def get_mdblist_ids(self, method, data):
if method == "mdblist_list":
2022-02-06 03:52:54 +00:00
params = {"sort": data["sort_by"]}
logger.info(f"Processing Mdblist.com List: {data['url']}")
logger.info(f"Sort By: {data['sort_by']}")
if data["limit"] > 0:
logger.info(f"Limit: {data['limit']} items")
params["limit"] = data["limit"]
2022-01-25 07:45:31 +00:00
parsed_url = urlparse(data["url"])
url_base = parsed_url._replace(query=None).geturl()
2022-02-02 09:47:34 +00:00
url_base = url_base if url_base.endswith("/") else f"{url_base}/"
url_base = url_base if url_base.endswith("json/") else f"{url_base}json/"
2022-01-25 07:45:31 +00:00
return [(i["imdb_id"], "imdb") for i in self.config.get_json(url_base, headers=headers, params=params)]
2022-01-15 22:40:59 +00:00
else:
raise Failed(f"Mdblist Error: Method {method} not supported")