Checkpoint: New implementation for search

This commit is contained in:
Michael Shepanski 2016-03-31 16:52:48 -04:00
parent 03469a7f55
commit b10faf8560
6 changed files with 197 additions and 129 deletions

View file

@ -11,9 +11,10 @@ CONFIG_PATH = os.path.expanduser('~/.config/plexapi/config.ini')
CONFIG = PlexConfig(CONFIG_PATH)
# Core Settings
PROJECT = 'PlexAPI'
VERSION = '2.0.0a'
TIMEOUT = CONFIG.get('plexapi.timeout', 5, int)
PROJECT = 'PlexAPI' # name provided to plex server
VERSION = '2.0.0a' # version of this api
TIMEOUT = CONFIG.get('plexapi.timeout', 5, int) # request timeout
X_PLEX_CONTAINER_SIZE = 50 # max results to return in a single search page
# Plex Header Configuation
X_PLEX_PROVIDES = 'player,controller' # one or more of [player, controller, server]

View file

@ -1,9 +1,34 @@
# -*- coding: utf-8 -*-
"""
PlexLibrary
# --- SEARCH ---
# type=1
# sort=column:[asc|desc]
# -> column in {addedAt,originallyAvailableAt,lastViewedAt,titleSort,rating,mediaHeight,duration}
# unwatched=1
# duplicate=1
# year=yyyy,yyyy,yyyy
# decade=<key>,<key>
# genre=<id>,<id>
# contentRating=<key>,<key>
# collection=<id>,<id>
# director=<id>,<id>
# actor=<id>,<id>
# studio=<key>,<key>
# resolution=720,480,sd ??
# X-Plex-Container-Start=0
# X-Plex-Container-Size=0
# --- CANNED ---
# /library/sections/1/onDeck
# /library/sections/1/recentlyViewed
# /library/sections/1/all?sort=addedAt:desc
"""
from plexapi import utils
from plexapi.exceptions import NotFound
from plexapi import log, utils
from plexapi import X_PLEX_CONTAINER_SIZE
from plexapi.exceptions import BadRequest, NotFound
class Library(object):
@ -54,15 +79,21 @@ class Library(object):
def getByKey(self, key):
return utils.findKey(self.server, key)
def search(self, title, prefilter='all', libtype=None, **tags):
def search(self, title, libtype=None, **kwargs):
""" Searching within a library section is much more powerful. It seems certain attributes on
the media objects can be targeted to filter this search down a bit, but I havent found the
documentation for it. For example: "studio=Comedy%20Central" or "year=1999" "title=Kung Fu"
all work.
"""
# TODO: FIGURE THIS OUT!
args = {}
if title: args['title'] = title
if libtype: args['type'] = utils.searchType(libtype)
for tag, obj in tags.items():
args[tag] = obj.id
query = '/library/%s%s' % (prefilter, utils.joinArgs(args))
for attr, value in kwargs.items():
args[attr] = value
query = '/library/all%s' % utils.joinArgs(args)
return utils.listItems(self.server, query)
def cleanBundles(self):
self.server.query('/library/clean/bundles')
@ -78,6 +109,9 @@ class Library(object):
class LibrarySection(object):
ALLOWED_FILTERS = ()
ALLOWED_SORT = ()
BOOLEAN_FILTERS = ('unwatched', 'duplicate')
def __init__(self, server, data, initpath):
self.server = server
@ -91,65 +125,76 @@ class LibrarySection(object):
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
def _primary_list(self, key):
return utils.listItems(self.server, '/library/sections/%s/%s' % (self.key, key))
def _secondary_list(self, key, input=None):
choices = listChoices(self.server, '/library/sections/%s/%s' % (self.key, key))
if not input:
return list(choices.keys())
return utils.listItems(self.server, '/library/sections/%s/%s/%s' % (self.key, key, choices[input]))
def all(self):
return self._primary_list('all')
def newest(self):
return self._primary_list('newest')
def onDeck(self):
return self._primary_list('onDeck')
def recentlyAdded(self):
return self._primary_list('recentlyAdded')
def recentlyViewed(self):
return self._primary_list('recentlyViewed')
def unwatched(self):
return self._primary_list('unwatched')
def contentRating(self, input=None):
return self._secondary_list('contentRating', input)
def firstCharacter(self, input=None):
return self._secondary_list('firstCharacter', input)
def genre(self, input=None):
return self._secondary_list('genre', input)
def year(self, input=None):
return self._secondary_list('year', input)
def get(self, title):
path = '/library/sections/%s/all' % self.key
return utils.findItem(self.server, path, title)
def search(self, title, filter='all', libtype=None, **tags):
""" Search section content.
title: Title to search (pass None to search all titles).
filter: One of {all, newest, onDeck, recentlyAdded, recentlyViewed, unwatched}.
libtype: One of {movie, show, season, episode, artist, album, track}.
tags: One of {country, director, genre, producer, actor, writer}.
def all(self):
return utils.listItems(self.server, '/library/sections/%s/all' % self.key)
def onDeck(self):
return utils.listItems(self.server, '/library/sections/%s/onDeck' % self.key)
def listChoices(self, category, libtype=None, **kwargs):
""" List choices for the specified filter category. kwargs can be any of the same
kwargs in self.search() to help narrow down the choices to only those that
matter in your current context.
"""
if category in kwargs:
raise BadRequest('Cannot include kwarg equal to specified category: %s' % category)
args = {}
if title: args['title'] = title
if libtype: args['type'] = utils.searchType(libtype)
for tag, obj in tags.items():
args[tag] = obj.id
query = '/library/sections/%s/%s%s' % (self.key, filter, utils.joinArgs(args))
for subcategory, value in kwargs.items():
args[category] = self._cleanSearchFilter(subcategory, value)
if libtype is not None: args['type'] = utils.searchType(libtype)
query = '/library/sections/%s/%s%s' % (self.key, category, utils.joinArgs(args))
return utils.listItems(self.server, query)
def search(self, title=None, sort=None, maxresults=999999, libtype=None, **kwargs):
""" Search the library. If there are many results, they will be fetched from the server
in batches of X_PLEX_CONTAINER_SIZE amounts. If you're only looking for the first <num>
results, it would be wise to set the maxresults option to that amount so this functions
doesn't iterate over all results on the server.
title: General string query to search for.
sort: column:dir; column can be any of {addedAt, originallyAvailableAt, lastViewedAt,
titleSort, rating, mediaHeight, duration}. dir can be asc or desc.
maxresults: Only return the specified number of results
libtype: Filter results to a spcifiec libtype {movie, show, episode, artist, album, track}
kwargs: Any of the available filters for the current library section. Partial string
matches allowed. Multiple matches OR together. All inputs will be compared with the
available options and a warning logged if the option does not appear valid.
'unwatched': Display or hide unwatched content (True, False). [all]
'duplicate': Display or hide duplicate items (True, False). [movie]
'actor': List of actors to search ([actor_or_id, ...]). [movie]
'collection': List of collections to search within ([collection_or_id, ...]). [all]
'contentRating': List of content ratings to search within ([rating_or_key, ...]). [movie, tv]
'country': List of countries to search within ([country_or_key, ...]). [movie, music]
'decade': List of decades to search within ([yyy0, ...]). [movie]
'director': List of directors to search ([director_or_id, ...]). [movie]
'genre': List Genres to search within ([genere_or_id, ...]). [all]
'network': List of TV networks to search within ([resolution_or_key, ...]). [tv]
'resolution': List of video resolutions to search within ([resolution_or_key, ...]). [movie]
'studio': List of studios to search within ([studio_or_key, ...]). [music]
'year': List of years to search within ([yyyy, ...]). [all]
"""
# Cleanup the core arguments
args = {}
for category, value in kwargs.items():
args[category] = self._cleanSearchFilter(category, value, libtype)
if title is not None: args['title'] = title
if sort is not None: args['sort'] = self._cleanSearchSort(sort)
if libtype is not None: args['type'] = utils.searchType(libtype)
# Iterate over the results
results, subresults = [], '_init'
args['X-Plex-Container-Start'] = 0
args['X-Plex-Container-Size'] = min(X_PLEX_CONTAINER_SIZE, maxresults)
while subresults and maxresults > len(results):
query = '/library/sections/%s/all%s' % (self.key, utils.joinArgs(args))
subresults = utils.listItems(self.server, query)
results += subresults[:maxresults-len(results)]
args['X-Plex-Container-Start'] += args['X-Plex-Container-Size']
return results
def analyze(self):
self.server.query('/library/sections/%s/analyze' % self.key)
@ -159,75 +204,86 @@ class LibrarySection(object):
def refresh(self):
self.server.query('/library/sections/%s/refresh' % self.key)
def _cleanSearchFilter(self, category, value, libtype=None):
result = set()
if category not in self.ALLOWED_FILTERS:
raise BadRequest('Unknown filter category: %s' % category)
if category in self.BOOLEAN_FILTERS:
return '1' if value else '0'
if not isinstance(value, (list, tuple)):
value = [value]
choices = self.listChoices(category, libtype)
lookup = {c.title.lower():c.key for c in choices}
allowed = set(c.key for c in choices)
for dirtykey in value:
dirtykey = str(dirtykey).lower()
if dirtykey in allowed:
result.add(dirtykey); continue
if dirtykey in lookup:
result.add(lookup[dirtykey]); continue
for key in [k for t,k in lookup.items() if dirtykey in t]:
result.add(key)
if not result:
log.warning('No known filter values: %s; Will probably yield no results.' % ', '.join(value))
return ','.join(result)
def _cleanSearchSort(self, sort):
sort = '%s:asc' % sort if ':' not in sort else sort
scol, sdir = sort.lower().split(':')
lookup = {s.lower():s for s in self.ALLOWED_SORT}
if scol not in lookup:
raise BadRequest('Unknown sort column: %s' % scol)
if sdir not in ('asc', 'desc'):
raise BadRequest('Unknown sort dir: %s' % sdir)
return '%s:%s' % (lookup[scol], sdir)
class MovieSection(LibrarySection):
ALLOWED_FILTERS = ('unwatched', 'duplicate', 'year', 'decade', 'genre', 'contentRating', 'collection',
'director', 'actor', 'country', 'studio', 'resolution')
ALLOWED_SORT = ('addedAt', 'originallyAvailableAt', 'lastViewedAt', 'titleSort', 'rating',
'mediaHeight', 'duration')
TYPE = 'movie'
def actor(self, input=None):
return self._secondary_list('actor', input)
def country(self, input=None):
return self._secondary_list('country', input)
def decade(self, input=None):
return self._secondary_list('decade', input)
def director(self, input=None):
return self._secondary_list('director', input)
def rating(self, input=None):
return self._secondary_list('rating', input)
def resolution(self, input=None):
return self._secondary_list('resolution', input)
def search(self, title, filter='all', **tags):
return super(MovieSection, self).search(title, filter=filter, libtype='movie', **tags)
class ShowSection(LibrarySection):
ALLOWED_FILTERS = ('unwatched', 'year', 'genre', 'contentRating', 'network', 'collection')
ALLOWED_SORT = ('addedAt', 'lastViewedAt', 'originallyAvailableAt', 'titleSort', 'rating', 'unwatched')
TYPE = 'show'
def recentlyViewedShows(self):
return self._primary_list('recentlyViewedShows')
def searchShows(self, **kwargs):
return self.search(libtype='show', **kwargs)
def search(self, title, filter='all', **tags):
return super(ShowSection, self).search(title, filter=filter, libtype='show', **tags)
def searchEpisodes(self, title, filter='all', **tags):
return super(ShowSection, self).search(title, filter=filter, libtype='episode', **tags)
def searchEpisodes(self, **kwargs):
return self.search(libtype='episode', **kwargs)
class MusicSection(LibrarySection):
ALLOWED_FILTERS = ('genre', 'country', 'collection')
ALLOWED_SORT = ('addedAt', 'lastViewedAt', 'viewCount', 'titleSort')
TYPE = 'artist'
def searchShows(self, **kwargs):
return self.search(libtype='artist', **kwargs)
def search(self, title, filter='all', atype=None, **tags):
""" Search section content.
title: Title to search (pass None to search all titles).
filter: One of {'all', 'newest', 'onDeck', 'recentlyAdded', 'recentlyViewed', 'unwatched'}.
videotype: One of {'artist', 'album', 'track'}.
tags: One of {country, director, genre, producer, actor, writer}.
"""
args = {}
if title: args['title'] = title
if atype: args['type'] = utils.searchType(atype)
for tag, obj in tags.items():
args[tag] = obj.id
query = '/library/sections/%s/%s%s' % (self.key, filter, utils.joinArgs(args))
return utils.listItems(self.server, query)
def recentlyViewedShows(self):
return self._primary_list('recentlyViewedShows')
def searchArtists(self, title, filter='all', **tags):
return self.search(title, filter=filter, atype='artist', **tags)
def searchAlbums(self, title, filter='all', **tags):
return self.search(title, filter=filter, atype='album', **tags)
def searchTracks(self, title, filter='all', **tags):
return self.search(title, filter=filter, atype='track', **tags)
def searchEpisodes(self, **kwargs):
return self.search(libtype='album', **kwargs)
def searchTracks(self, **kwargs):
return self.search(libtype='track', **kwargs)
def listChoices(server, path):
return {c.attrib['title']:c.attrib['key'] for c in server.query(path)}
@utils.register_libtype
class FilterChoice(object):
TYPE = 'Directory'
def __init__(self, server, data, initpath):
self.server = server
self.initpath = initpath
self.fastKey = data.attrib.get('fastKey')
self.key = data.attrib.get('key')
self.title = data.attrib.get('title')
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s:%s>' % (self.__class__.__name__, self.key, title)

View file

@ -56,7 +56,6 @@ class MediaPart(object):
selected = list(filter(lambda x: x.selected is True, streams))
if len(selected) == 0:
return None
return selected[0]

View file

@ -101,6 +101,7 @@ class PlexServer(object):
return ElementTree.fromstring(data) if data else None
def search(self, query, mediatype=None):
""" Searching within a library section is much more powerful. """
items = utils.listItems(self, '/search?query=%s' % quote(query))
if mediatype:
return [item for item in items if item.type == mediatype]

View file

@ -9,6 +9,9 @@ from plexapi.compat import quote, urlencode
from plexapi.exceptions import NotFound, UnknownType, Unsupported
# Search Types - Plex uses these to filter specific media types when searching.
SEARCHTYPES = {'movie':1, 'show':2, 'season':3, 'episode':4, 'artist':8, 'album':9, 'track':10}
# Registry of library types we may come across when parsing XML. This allows us to
# define a few helper functions to dynamically convery the XML into objects.
# see buildItem() below for an example.
@ -17,7 +20,6 @@ def register_libtype(cls):
LIBRARY_TYPES[cls.TYPE] = cls
return cls
# This used to be a simple variable equal to '__NA__'. However, there has been need to
# compare NA against None in some use cases. This object allows the internals of PlexAPI
# to distinguish between unfetched values and fetched, but non-existent values.
@ -45,7 +47,7 @@ class PlexPartialObject(object):
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
return '<%s:%s:%s>' % (self.__class__.__name__, self.key, title.encode('utf8'))
def __getattr__(self, attr):
if self.isPartialObject():
@ -143,7 +145,7 @@ class PlexPartialObject(object):
def buildItem(server, elem, initpath):
libtype = elem.attrib.get('type')
libtype = elem.attrib.get('type') or elem.tag
if libtype in LIBRARY_TYPES:
cls = LIBRARY_TYPES[libtype]
return cls(server, elem, initpath)
@ -189,6 +191,10 @@ def joinArgs(args):
return '?%s' % '&'.join(arglist)
def listChoices(server, path):
return {c.attrib['title']:c.attrib['key'] for c in server.query(path)}
def listItems(server, path, libtype=None, watched=None):
items = []
for elem in server.query(path):
@ -218,14 +224,13 @@ def rget(obj, attrstr, default=None, delim='.'):
def searchType(libtype):
if libtype == 'movie': return 1
elif libtype == 'show': return 2
elif libtype == 'season': return 3
elif libtype == 'episode': return 4
elif libtype == 'artist': return 8
elif libtype == 'album': return 9
elif libtype == 'track': return 10
raise NotFound('Unknown libtype: %s' % libtype)
SEARCHTYPESSTRS = [str(k) for k in SEARCHTYPES.keys()]
if libtype in SEARCHTYPES + SEARCHTYPESSTRS:
return libtype
stype = SEARCHTYPES.get(libtype.lower())
if not stype:
raise NotFound('Unknown libtype: %s' % libtype)
return stype
def toDatetime(value, format=None):

View file

@ -107,6 +107,12 @@ def test_search_audio(plex, user=None):
log(4, 'Result Music: %s' % result_music)
assert result_server, 'Artist not found.'
assert result_server == result_library == result_music, 'Audio searches not consistent.'
@register('search,show')
def test_crazy_search(plex, user=None):
movies = plex.library.section(MOVIE_SECTION)
print(movies.search(duplicate=True))
#-----------------------