mirror of
https://github.com/mza921/Plex-Auto-Collections
synced 2024-11-14 16:27:22 +00:00
491 lines
22 KiB
Python
491 lines
22 KiB
Python
from plexapi.video import Movie
|
|
from plexapi.video import Show
|
|
from plexapi import exceptions as PlexExceptions
|
|
from plexapi.library import MovieSection
|
|
from plexapi.library import ShowSection
|
|
from datetime import datetime, timedelta
|
|
import imdb_tools
|
|
import trakt_tools
|
|
from config_tools import Config
|
|
from config_tools import TMDB
|
|
from config_tools import TraktClient
|
|
from config_tools import Tautulli
|
|
from bs4 import BeautifulSoup
|
|
from urllib.request import Request
|
|
from urllib.request import urlopen
|
|
from urllib.parse import urlparse
|
|
from tmdbv3api import TMDb
|
|
from tmdbv3api import Movie as TMDb_Movie
|
|
import os
|
|
import sqlite3
|
|
|
|
|
|
def adjust_space(old_length, display_title):
|
|
space_length = old_length - len(display_title)
|
|
if space_length > 0:
|
|
display_title += " " * space_length
|
|
return display_title
|
|
|
|
def get_movie(plex, data):
|
|
# If an int is passed as data, assume it is a movie's rating key
|
|
if isinstance(data, int):
|
|
try:
|
|
return plex.Server.fetchItem(data)
|
|
except PlexExceptions.BadRequest:
|
|
print("| Nothing found")
|
|
return None
|
|
elif isinstance(data, Movie):
|
|
return data
|
|
else:
|
|
movie_list = plex.Library.search(title=data)
|
|
if movie_list:
|
|
return movie_list
|
|
else:
|
|
print("| Movie: {} not found".format(data))
|
|
return None
|
|
|
|
def get_item(plex, data):
|
|
# If an int is passed as data, assume it is a movie's rating key
|
|
if isinstance(data, int):
|
|
try:
|
|
return plex.Server.fetchItem(data)
|
|
except PlexExceptions.BadRequest:
|
|
return "Nothing found"
|
|
elif isinstance(data, Movie):
|
|
return data
|
|
elif isinstance(data, Show):
|
|
return data
|
|
else:
|
|
print(data)
|
|
item_list = plex.Library.search(title=data)
|
|
if item_list:
|
|
return item_list
|
|
else:
|
|
return "Item: " + data + " not found"
|
|
|
|
def get_actor_rkey(plex, data):
|
|
"""Takes in actors name as str and returns as Plex's corresponding rating key ID"""
|
|
search = data
|
|
|
|
# We must first perform standard search against the Plex Server
|
|
# Searching in the Library via Actor only works if the ratingKey is already known
|
|
results = plex.Server.search(search)
|
|
|
|
for entry in results:
|
|
entry = str(entry)
|
|
entry = entry.split(":")
|
|
entry[0] = entry[0][1:]
|
|
if entry[0] == "Movie":
|
|
movie_id = int(entry[1])
|
|
break
|
|
try:
|
|
# We need to pull details from a movie to correspond the actor's name to their Plex Rating Key
|
|
movie_roles = plex.Server.fetchItem(movie_id).roles
|
|
for role in movie_roles:
|
|
role = str(role).split(":")
|
|
movie_actor_id = role[1]
|
|
movie_actor_name = role[2][:-1].upper()
|
|
if search.upper().replace(" ", "-") == movie_actor_name:
|
|
actor_id = movie_actor_id
|
|
return int(actor_id)
|
|
except UnboundLocalError:
|
|
raise ValueError("| Config Error: Actor: {} not found".format(search))
|
|
|
|
def get_movie_map(config_path, plex):
|
|
movie_map = {}
|
|
current_length = 0
|
|
current_count = 0
|
|
if TMDB.valid:
|
|
tmdb = TMDb()
|
|
tmdb.api_key = TMDB(config_path).apikey
|
|
tmovie = TMDb_Movie()
|
|
plex_movies = plex.Library.all()
|
|
for m in plex_movies:
|
|
current_count += 1
|
|
print_display = "| Processing: {}/{} {}".format(current_count, len(plex_movies), m.title)
|
|
print(adjust_space(current_length, print_display), end="\r")
|
|
current_length = len(print_display)
|
|
guid = urlparse(m.guid)
|
|
item_type = guid.scheme.split('.')[-1]
|
|
if item_type == 'plex':
|
|
imdb_id, tmdb_id = alt_id_lookup(plex, m)
|
|
elif item_type == 'imdb':
|
|
imdb_id = guid.netloc
|
|
tmdb_id = None
|
|
if TMDB.valid and tmdb_id is None:
|
|
tmdb_id = imdb_tools.imdb_get_tmdb(config_path, imdb_id)
|
|
if TraktClient.valid and tmdb_id is None:
|
|
tmdb_id = trakt_tools.trakt_imdb_to_tmdb(config_path, imdb_id)
|
|
elif item_type == 'tmdb':
|
|
tmdb_id = guid.netloc
|
|
else:
|
|
tmdb_id = None
|
|
if tmdb_id:
|
|
movie_map[tmdb_id] = m.ratingKey
|
|
else:
|
|
print(adjust_space(current_length, "| Unable to map TMDb ID for {} [GUID]: {}".format(m.title, m.guid)))
|
|
print(adjust_space(current_length, "| Processed {} Movies".format(len(plex_movies))))
|
|
return movie_map
|
|
|
|
def get_show_map(config_path, plex):
|
|
show_map = {}
|
|
current_length = 0
|
|
current_count = 0
|
|
if TMDB.valid:
|
|
tmdb = TMDb()
|
|
tmdb.api_key = TMDB(config_path).apikey
|
|
plex_shows = plex.Library.all()
|
|
for s in plex_shows:
|
|
current_count += 1
|
|
print_display = "| Processing: {}/{} {}".format(current_count, len(plex_shows), s.title)
|
|
print(adjust_space(current_length, print_display), end="\r")
|
|
current_length = len(print_display)
|
|
guid = urlparse(s.guid)
|
|
item_type = guid.scheme.split('.')[-1]
|
|
if item_type == 'thetvdb':
|
|
tvdb_id = guid.netloc
|
|
elif item_type == 'themoviedb':
|
|
tmdb_id = guid.netloc
|
|
tvdb_id = None
|
|
if TMDB.valid and tmdb_id is None:
|
|
tvdb_id = imdb_tools.tmdb_get_tvdb(config_path, tmdb_id)
|
|
if TraktClient.valid and tmdb_id is None:
|
|
tvdb_id = trakt_tools.trakt_tmdb_to_tvdb(config_path, tmdb_id)
|
|
else:
|
|
tvdb_id = None
|
|
if tvdb_id:
|
|
show_map[tvdb_id] = s.ratingKey
|
|
else:
|
|
print(adjust_space(current_length, "| Unable to map TVDb ID for {} [GUID]: {}".format(s.title, s.guid)))
|
|
print(adjust_space(current_length, "| Processed {} Shows".format(len(plex_shows))))
|
|
return show_map
|
|
|
|
# subtype can be 'movie', 'show', or None (movie/tv combined)
|
|
def get_collection(plex, data, exact=None, subtype=None):
|
|
collection_list = plex.Library.search(title=data, libtype="collection")
|
|
if len(collection_list) > 1:
|
|
for collection in collection_list:
|
|
if collection.title == data:
|
|
return collection
|
|
if not exact:
|
|
c_names = ["| " + (str(i + 1) + ") " + collection.title + " (" + collection.subtype + ")") for i, collection in enumerate(collection_list)]
|
|
print("| 0) Do Nothing")
|
|
print("\n".join(c_names))
|
|
while True:
|
|
try:
|
|
selection = int(input("| Choose collection number: ")) - 1
|
|
if selection >= 0:
|
|
return collection_list[selection]
|
|
elif selection == -1:
|
|
raise ValueError("No collection selected")
|
|
else:
|
|
print("| Invalid entry")
|
|
except (IndexError, ValueError) as E:
|
|
print("| Invalid entry")
|
|
elif len(collection_list) == 1 and (exact is None or collection_list[0].title == data):
|
|
return collection_list[0]
|
|
else:
|
|
raise ValueError("Collection {} Not Found".format(data))
|
|
|
|
def add_to_collection(config_path, plex, method, value, c, plex_map=None, map=None, filters=None):
|
|
if map is None:
|
|
map = {}
|
|
movies = []
|
|
shows = []
|
|
items = []
|
|
missing = []
|
|
def search_plex():
|
|
search_terms = {}
|
|
output = ""
|
|
for attr_pair in value:
|
|
if attr_pair[0] == "actor":
|
|
search_list = []
|
|
for actor in attr_pair[1]:
|
|
search_list.append(get_actor_rkey(plex, actor))
|
|
else:
|
|
search_list = attr_pair[1]
|
|
final_method = attr_pair[0][:-4] + "!" if attr_pair[0][-4:] == ".not" else attr_pair[0]
|
|
if plex.library_type == "show":
|
|
final_method = "show." + final_method
|
|
search_terms[final_method] = search_list
|
|
ors = ""
|
|
for param in attr_pair[1]:
|
|
ors = ors + (" OR " if len(ors) > 0 else attr_pair[0] + "(") + str(param)
|
|
output = output + ("\n|\t\t AND " if len(output) > 0 else "| Processing Plex Search: ") + ors + ")"
|
|
print(output)
|
|
return plex.Library.search(**search_terms)
|
|
|
|
if "trakt" in method and not TraktClient.valid:
|
|
raise KeyError("| trakt connection required for {}",format(method))
|
|
elif ("imdb" in method or "tmdb" in method) and not TMDB.valid:
|
|
raise KeyError("| tmdb connection required for {}",format(method))
|
|
elif method == "tautulli" and not Tautulli.valid:
|
|
raise KeyError("| tautulli connection required for {}",format(method))
|
|
elif plex.library_type == "movie":
|
|
if plex_map is None and ("imdb" in method or "tmdb" in method or "trakt" in method):
|
|
plex_map = get_movie_map()
|
|
if method == "plex_collection":
|
|
movies = value.children
|
|
elif method == "imdb_list":
|
|
movies, missing = imdb_tools.imdb_get_movies(config_path, plex, plex_map, value)
|
|
elif "tmdb" in method:
|
|
movies, missing = imdb_tools.tmdb_get_movies(config_path, plex, plex_map, value, method)
|
|
elif "trakt" in method:
|
|
movies, missing = trakt_tools.trakt_get_movies(config_path, plex, plex_map, value, method)
|
|
elif method == "tautulli":
|
|
movies, missing = imdb_tools.get_tautulli(config_path, plex, value)
|
|
elif method == "all":
|
|
movies = plex.Library.all()
|
|
elif method == "plex_search":
|
|
movies = search_plex()
|
|
else:
|
|
print("| Config Error: {} method not supported".format(method))
|
|
elif plex.library_type == "show":
|
|
if plex_map is None and ("tvdb" in method or "tmdb" in method or "trakt" in method):
|
|
plex_map = get_show_map()
|
|
if method == "plex_collection":
|
|
shows = value.children
|
|
elif "tmdb" in method:
|
|
shows, missing = imdb_tools.tmdb_get_shows(config_path, plex, plex_map, value, method)
|
|
elif method == "tvdb_show":
|
|
shows, missing = imdb_tools.tvdb_get_shows(config_path, plex, plex_map, value)
|
|
elif "trakt" in method:
|
|
shows, missing = trakt_tools.trakt_get_shows(config_path, plex, plex_map, value, method)
|
|
elif method == "tautulli":
|
|
shows, missing = imdb_tools.get_tautulli(config_path, plex, value)
|
|
elif method == "all":
|
|
shows = plex.Library.all()
|
|
elif method == "plex_search":
|
|
shows = search_plex()
|
|
else:
|
|
print("| Config Error: {} method not supported".format(method))
|
|
|
|
filter_alias = {
|
|
"actor": "actors",
|
|
"content_rating": "contentRating",
|
|
"country": "countries",
|
|
"director": "directors",
|
|
"genre": "genres",
|
|
"studio": "studio",
|
|
"year": "year",
|
|
"writer": "writers",
|
|
"rating": "rating",
|
|
"max_age": "max_age",
|
|
"originally_available": "originallyAvailableAt",
|
|
"video_resolution": "video_resolution",
|
|
"audio_language": "audio_language",
|
|
"subtitle_language": "subtitle_language",
|
|
}
|
|
|
|
if movies:
|
|
# Check if already in collection
|
|
cols = plex.Library.search(title=c, libtype="collection")
|
|
try:
|
|
fs = cols[0].children
|
|
except IndexError:
|
|
fs = []
|
|
movie_count = 0
|
|
movie_max = len(movies)
|
|
max_str_len = len(str(movie_max))
|
|
current_length = 0
|
|
for rk in movies:
|
|
current_m = get_movie(plex, rk)
|
|
current_m.reload()
|
|
movie_count += 1
|
|
count_str_len = len(str(movie_count))
|
|
display_count = (" " * (max_str_len - count_str_len)) + str(movie_count)
|
|
match = True
|
|
if filters:
|
|
for f in filters:
|
|
print_display = "| Filtering {}/{} {}".format(display_count, movie_max, current_m.title)
|
|
print(adjust_space(current_length, print_display), end = "\r")
|
|
current_length = len(print_display)
|
|
modifier = f[0][-4:]
|
|
method = filter_alias[f[0][:-4]] if modifier in [".not", ".lte", ".gte"] else filter_alias[f[0]]
|
|
if method == "max_age":
|
|
threshold_date = datetime.now() - timedelta(days=f[1])
|
|
attr = getattr(current_m, "originallyAvailableAt")
|
|
if attr is None or attr < threshold_date:
|
|
match = False
|
|
break
|
|
elif modifier in [".gte", ".lte"]:
|
|
if method == "originallyAvailableAt":
|
|
threshold_date = datetime.strptime(f[1], "%m/%d/%y")
|
|
attr = getattr(current_m, "originallyAvailableAt")
|
|
if (modifier == ".lte" and attr > threshold_date) or (modifier == ".gte" and attr < threshold_date):
|
|
match = False
|
|
break
|
|
elif method in ["year", "rating"]:
|
|
attr = getattr(current_m, method)
|
|
if (modifier == ".lte" and attr > f[1]) or (modifier == ".gte" and attr < f[1]):
|
|
match = False
|
|
break
|
|
else:
|
|
terms = f[1] if isinstance(f[1], list) else str(f[1]).split(", ")
|
|
if method in ["video_resolution", "audio_language", "subtitle_language"]:
|
|
for media in current_m.media:
|
|
if method == "video_resolution":
|
|
mv_attrs = [media.videoResolution]
|
|
for part in media.parts:
|
|
if method == "audio_language":
|
|
mv_attrs = ([audio_stream.language for audio_stream in part.audioStreams()])
|
|
if method == "subtitle_language":
|
|
mv_attrs = ([subtitle_stream.language for subtitle_stream in part.subtitleStreams()])
|
|
elif method in ["contentRating", "studio", "year", "rating", "originallyAvailableAt"]: # Otherwise, it's a string. Make it a list.
|
|
mv_attrs = [str(getattr(current_m, method))]
|
|
elif method in ["actors", "countries", "directors", "genres", "writers"]:
|
|
mv_attrs = [getattr(x, 'tag') for x in getattr(current_m, method)]
|
|
|
|
# Get the intersection of the user's terms and movie's terms
|
|
# If it's empty and modifier is not .not, it's not a match
|
|
# If it's not empty and modifier is .not, it's not a match
|
|
if (not list(set(terms) & set(mv_attrs)) and modifier != ".not") or (list(set(terms) & set(mv_attrs)) and modifier == ".not"):
|
|
match = False
|
|
break
|
|
if match:
|
|
if current_m in fs:
|
|
map[current_m.ratingKey] = None
|
|
else:
|
|
current_m.addCollection(c)
|
|
print(adjust_space(current_length, "| {} Collection | {} | {}".format(c, "=" if current_m in fs else "+", current_m.title)))
|
|
print(adjust_space(current_length, "| Processed {} Movies".format(movie_max)))
|
|
elif plex.library_type == "movie":
|
|
print("| No movies found")
|
|
|
|
if shows:
|
|
# Check if already in collection
|
|
cols = plex.Library.search(title=c, libtype="collection")
|
|
try:
|
|
fs = cols[0].children
|
|
except IndexError:
|
|
fs = []
|
|
show_count = 0
|
|
show_max = len(shows)
|
|
current_length = 0
|
|
for rk in shows:
|
|
current_s = get_item(plex, rk)
|
|
current_s.reload()
|
|
show_count += 1
|
|
match = True
|
|
if filters:
|
|
for f in filters:
|
|
print_display = "| Filtering {}/{} {}".format(show_count, show_max, current_s.title)
|
|
print(adjust_space(current_length, print_display), end = "\r")
|
|
current_length = len(print_display)
|
|
modifier = f[0][-4:]
|
|
method = filter_alias[f[0][:-4]] if modifier in [".not", ".lte", ".gte"] else filter_alias[f[0]]
|
|
if method == "max_age":
|
|
threshold_date = datetime.now() - timedelta(days=f[1])
|
|
attr = getattr(current_s, "originallyAvailableAt")
|
|
if attr is None or attr < threshold_date:
|
|
match = False
|
|
break
|
|
elif modifier in [".gte", ".lte"]:
|
|
if method == "originallyAvailableAt":
|
|
threshold_date = datetime.strptime(f[1], "%m/%d/%y")
|
|
attr = getattr(current_s, "originallyAvailableAt")
|
|
if (modifier == ".lte" and attr > threshold_date) or (modifier == ".gte" and attr < threshold_date):
|
|
match = False
|
|
break
|
|
elif method in ["year", "rating"]:
|
|
attr = getattr(current_s, method)
|
|
if (modifier == ".lte" and attr > f[1]) or (modifier == ".gte" and attr < f[1]):
|
|
match = False
|
|
break
|
|
else:
|
|
terms = f[1] if isinstance(f[1], list) else str(f[1]).split(", ")
|
|
# if method in ["video_resolution", "audio_language", "subtitle_language"]:
|
|
# for media in current_s.media:
|
|
# if method == "video_resolution":
|
|
# show_attrs = [media.videoResolution]
|
|
# for part in media.parts:
|
|
# if method == "audio_language":
|
|
# show_attrs = ([audio_stream.language for audio_stream in part.audioStreams()])
|
|
# if method == "subtitle_language":
|
|
# show_attrs = ([subtitle_stream.language for subtitle_stream in part.subtitleStreams()])
|
|
if method in ["contentRating", "studio", "year", "rating", "originallyAvailableAt"]:
|
|
mv_attrs = [str(getattr(current_s, method))]
|
|
elif method in ["actors", "genres"]:
|
|
mv_attrs = [getattr(x, 'tag') for x in getattr(current_s, method)]
|
|
|
|
# Get the intersection of the user's terms and show's terms
|
|
# If it's empty and modifier is not .not, it's not a match
|
|
# If it's not empty and modifier is .not, it's not a match
|
|
if (not list(set(terms) & set(show_attrs)) and modifier != ".not") or (list(set(terms) & set(show_attrs)) and modifier == ".not"):
|
|
match = False
|
|
break
|
|
if match:
|
|
if current_s in fs:
|
|
map[current_s.ratingKey] = None
|
|
else:
|
|
current_s.addCollection(c)
|
|
print(adjust_space(current_length, "| {} Collection | {} | {}".format(c, "=" if current_s in fs else "+", current_s.title)))
|
|
print(adjust_space(current_length, "| Processed {} Shows".format(show_max)))
|
|
elif plex.library_type == "show":
|
|
print("| No shows found")
|
|
|
|
try:
|
|
missing
|
|
except UnboundLocalError:
|
|
return
|
|
else:
|
|
return missing, map
|
|
|
|
def delete_collection(data):
|
|
confirm = input("| {} selected. Confirm deletion (y/n):".format(data.title))
|
|
if confirm == "y":
|
|
data.delete()
|
|
print("| Collection deleted")
|
|
|
|
def alt_id_lookup(plex, value):
|
|
req = Request('{}{}'.format(plex.url, value.key))
|
|
req.add_header('X-Plex-Token', plex.token)
|
|
req.add_header('User-Agent', 'Mozilla/5.0')
|
|
with urlopen(req) as response:
|
|
contents = response.read()
|
|
bs = BeautifulSoup(contents, 'lxml')
|
|
imdb_id = None
|
|
tmdb_id = None
|
|
for guid_tag in bs.find_all('guid'):
|
|
agent = urlparse(guid_tag['id']).scheme
|
|
guid = urlparse(guid_tag['id']).netloc
|
|
if agent == 'imdb':
|
|
imdb_id = guid
|
|
elif agent == 'tmdb':
|
|
tmdb_id = guid
|
|
return imdb_id, tmdb_id
|
|
|
|
def create_cache(config_path):
|
|
cache = os.path.join(os.path.dirname(config_path), 'cache.db')
|
|
connection = sqlite3.connect(cache)
|
|
with sqlite3.connect(cache) as connection:
|
|
connection.row_factory = sqlite3.Row
|
|
cursor = connection.cursor()
|
|
cursor.execute(''' SELECT count(name) FROM sqlite_master WHERE type='table' AND name='guids' ''')
|
|
if cursor.fetchone()[0] != 1:
|
|
print("| Initializing cache database.".format(cache))
|
|
cursor.execute('CREATE TABLE IF NOT EXISTS guids (plex_guid TEXT PRIMARY KEY, imdb_id TEXT, tmdb_id TEXT)')
|
|
|
|
def query_cache(config_path, key, column):
|
|
cache = os.path.join(os.path.dirname(config_path), 'cache.db')
|
|
with sqlite3.connect(cache) as connection:
|
|
connection.row_factory = sqlite3.Row
|
|
cursor = connection.cursor()
|
|
cursor.execute("SELECT * FROM guids WHERE plex_guid = ?", (key, ))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return row[column]
|
|
|
|
def update_cache(config_path, plex_guid, **kwargs):
|
|
cache = os.path.join(os.path.dirname(config_path), 'cache.db')
|
|
with sqlite3.connect(cache) as connection:
|
|
connection.row_factory = sqlite3.Row
|
|
cursor = connection.cursor()
|
|
if 'imdb_id' in kwargs:
|
|
imdb_id = kwargs['imdb_id']
|
|
cursor.execute('INSERT OR IGNORE INTO guids(plex_guid, imdb_id) VALUES(?, ?)', (plex_guid, imdb_id, ))
|
|
cursor.execute('UPDATE guids SET imdb_id = ? WHERE plex_guid = ?', (imdb_id, plex_guid))
|
|
if 'tmdb_id' in kwargs:
|
|
tmdb_id = kwargs['tmdb_id']
|
|
cursor.execute('INSERT OR IGNORE INTO guids(plex_guid, tmdb_id) VALUES(?, ?)', (plex_guid, tmdb_id, ))
|
|
cursor.execute('UPDATE guids SET tmdb_id = ? WHERE plex_guid = ?', (tmdb_id, plex_guid))
|