Cleanup Audio/Video search at the server & library level. Start testing audio a bit more

This commit is contained in:
Michael Shepanski 2016-03-15 14:36:59 -04:00
parent 830f96cf28
commit fd83cbab8a
7 changed files with 143 additions and 54 deletions

View file

@ -3,6 +3,8 @@ PlexLibrary
"""
from plexapi import video, audio, utils
from plexapi.exceptions import NotFound
from threading import Thread
from Queue import Queue
class Library(object):
@ -52,38 +54,31 @@ class Library(object):
def getByKey(self, key):
return video.find_key(self.server, key)
def searchVideo(self, title, filter='all', vtype=None, **tags):
""" Search all available content.
title: Title to search (pass None to search all titles).
filter: One of {'all', 'onDeck', 'recentlyAdded'}.
videotype: One of {'movie', 'show', 'season', 'episode'}.
tags: One of {country, director, genre, producer, actor, writer}.
"""
args = {}
if title: args['title'] = title
if vtype: args['type'] = video.search_type(vtype)
for tag, obj in tags.items():
args[tag] = obj.id
query = '/library/%s%s' % (filter, utils.joinArgs(args))
return video.list_items(self.server, query)
def searchAudio(self, title, filter='all', atype=None, **tags):
""" Search all available audio content.
title: Title to search (pass None to search all titles).
filter: One of {'all', 'onDeck', 'recentlyAdded'}.
atype: One of {'artist', 'album', 'track'}.
tags: One of {country, director, genre, producer, actor, writer}.
"""
args = {}
if title: args['title'] = title
if atype: args['type'] = audio.search_type(atype)
for tag, obj in tags.items():
args[tag] = obj.id
query = '/library/%s%s' % (filter, utils.joinArgs(args))
return audio.list_items(self.server, query)
search = searchVideo # TODO: make .search() a method to merge results of .searchVideo() and .searchAudio()
def search(self, title, prefilter='all', mtype=None, **tags):
# TODO: Handle tags much better.
_audio = lambda: self._search(audio, title, prefilter, mtype, **tags)
_video = lambda: self._search(video, title, prefilter, mtype, **tags)
results = []
qresults = utils.threaded([_audio, _video])
for item in iter(qresults.get_nowait, None):
results += item
return results
def _search(self, module, title, prefilter='all', mtype=None, **tags):
""" Search audio content.
title: Title to search (pass None to search all titles).
prefilter: One of {'all', 'onDeck', 'recentlyAdded'}.
mtype: One of {'artist', 'album', 'track', 'movie', 'show', 'season', 'episode'}.
tags: One of {country, director, genre, producer, actor, writer}.
"""
args = {}
if title: args['title'] = title
if mtype: args['type'] = module.search_type(mtype)
for tag, obj in tags.items():
args[tag] = obj.id
query = '/library/%s%s' % (prefilter, utils.joinArgs(args))
return module.list_items(self.server, query)
def cleanBundles(self):
self.server.query('/library/clean/bundles')

View file

@ -4,7 +4,7 @@ PlexServer
import requests
from requests.status_codes import _codes as codes
from plexapi import BASE_HEADERS, TIMEOUT
from plexapi import log, video, audio
from plexapi import log, audio, video, utils
from plexapi.client import Client
from plexapi.exceptions import BadRequest, NotFound
from plexapi.library import Library
@ -97,17 +97,20 @@ class PlexServer(object):
raise BadRequest('(%s) %s' % (response.status_code, codename))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data else None
def search(self, query, mediatype=None):
query = quote(query)
items = video.list_items(self, '/search?query=%s' % query)
if mediatype:
return [item for item in items if item.type == mediatype]
return items
def searchAudio(self, query, mediatype=None):
query = quote(query)
items = audio.list_items(self, '/search?query=%s' % query)
# Searching at this level is meant to be quick and dirty, if you're looking
# for more in-depth search filters look at plex.library.search().
_audio = lambda: self._search(audio, query, mediatype)
_video = lambda: self._search(video, query, mediatype)
results = []
qresults = utils.threaded([_audio, _video])
for item in iter(qresults.get_nowait, None):
results += item
return results
def _search(self, module, query, mediatype=None):
items = module.list_items(self, '/search?query=%s' % quote(query))
if mediatype:
return [item for item in items if item.type == mediatype]
return items

View file

@ -2,6 +2,8 @@
PlexAPI Utils
"""
from datetime import datetime
from threading import Thread
from Queue import Queue
try:
from urllib import quote # Python2
except ImportError:
@ -30,6 +32,13 @@ class PlexPartialObject(object):
self.server = server
self.initpath = initpath
self._loadData(data)
def __eq__(self, other):
return other is not None and self.type == other.type and self.key == other.key
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
def __getattr__(self, attr):
if self.isPartialObject():
@ -77,6 +86,22 @@ def joinArgs(args):
return '?%s' % '&'.join(arglist)
def threaded(funcs, *args, **kwargs):
def _run(func, _args, _kwargs, results):
results.put(func(*_args, **_kwargs))
threads = []
results = Queue(len(funcs) + 1)
for func in funcs:
targs = [func, args, kwargs, results]
threads.append(Thread(target=_run, args=targs))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
results.put(None)
return results
def toDatetime(value, format=None):
if value and value != NA:
if format: value = datetime.strptime(value, format)

View file

@ -19,13 +19,6 @@ except ImportError:
class Video(PlexPartialObject):
TYPE = None
def __eq__(self, other):
return other is not None and self.type == other.type and self.key == other.key
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
def _loadData(self, data):
self.type = data.attrib.get('type', NA)
self.key = data.attrib.get('key', NA)

View file

@ -92,6 +92,7 @@ if __name__ == '__main__':
parser.add_argument('--baseuri', help='Baseuri needed for auth token authentication')
parser.add_argument('--token', help='Auth token (instead of user/pass)')
parser.add_argument('--example', help='Only run the specified example.')
parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Print verbose logging.')
args = parser.parse_args()
plex, user = fetch_server(args)
for example in iter_tests(args.example):

View file

@ -19,9 +19,17 @@ SHOW_SEASON = 'Season 1'
SHOW_EPISODE = 'Winter Is Coming'
MOVIE_SECTION = 'Movies'
MOVIE_TITLE = 'Jurassic Park'
AUDIO_SECTION = 'Music'
AUDIO_ARTIST = 'Beastie Boys'
AUDIO_ALBUM = 'Licensed To Ill'
AUDIO_TRACK = 'Brass Monkey'
PLEX_CLIENT = 'iphone-mike'
#-----------------------
# Core
#-----------------------
@register('core,server')
def test_server(plex, user=None):
log(2, 'Username: %s' % plex.myPlexUsername)
@ -42,6 +50,10 @@ def test_list_sections(plex, user=None):
plex.library.section(MOVIE_SECTION)
#-----------------------
# Search
#-----------------------
@register('search,show')
def test_search_show(plex, user=None):
result_server = plex.search(SHOW_TITLE)
@ -82,7 +94,24 @@ def test_search_movie(plex, user=None):
assert result_server, 'Movie not found.'
assert result_server == result_library == result_movies, 'Movie searches not consistent.'
assert not result_shows, 'Show search returned show title.'
@register('search,audio')
def test_search_audio(plex, user=None):
result_server = plex.search(AUDIO_ARTIST)
result_library = plex.library.search(AUDIO_ARTIST)
result_music = plex.library.section(AUDIO_SECTION).search(AUDIO_ARTIST)
log(2, 'Searching for: %s' % AUDIO_ARTIST)
log(4, 'Result Server: %s' % result_server)
log(4, 'Result Library: %s' % result_library)
log(4, 'Result Music: %s' % result_music)
assert result_server, 'Artist not found.'
assert result_server == result_library == result_music, 'Audio searches not consistent.'
#-----------------------
# Library Navigation
#-----------------------
@register('navigate,movie,show')
def test_navigate_to_movie(plex, user=None):
@ -133,7 +162,34 @@ def test_navigate_around_show(plex, user=None):
assert episode.season() == season, 'episode.season() doesnt match expected season.'
@register('movie,action')
@register('navigate,audio')
def test_navigate_around_artist(plex, user=None):
section = plex.library.section(AUDIO_SECTION)
print(section)
artist = plex.library.section(AUDIO_SECTION).get(AUDIO_ARTIST)
print(artist)
# seasons = show.seasons()
# season = show.season(SHOW_SEASON)
# episodes = show.episodes()
# episode = show.episode(SHOW_EPISODE)
# log(2, 'Navigating around show: %s' % show)
# log(4, 'Seasons: %s...' % seasons[:3])
# log(4, 'Season: %s' % season)
# log(4, 'Episodes: %s...' % episodes[:3])
# log(4, 'Episode: %s' % episode)
# assert SHOW_SEASON in [s.title for s in seasons], 'Unable to get season: %s' % SHOW_SEASON
# assert SHOW_EPISODE in [e.title for e in episodes], 'Unable to get episode: %s' % SHOW_EPISODE
# assert season.show() == show, 'season.show() doesnt match expected show.'
# assert episode.show() == show, 'episode.show() doesnt match expected show.'
# assert episode.season() == season, 'episode.season() doesnt match expected season.'
#-----------------------
# Library Actions
#-----------------------
@register('action,movie')
def test_mark_movie_watched(plex, user=None):
movie = plex.library.section(MOVIE_SECTION).get(MOVIE_TITLE)
movie.markUnwatched()
@ -159,6 +215,10 @@ def test_refresh_video(plex, user=None):
result[0].refresh()
#-----------------------
# Metadata
#-----------------------
@register('meta,movie')
def test_original_title(plex, user=None):
movie_title = 'Bedside Detective'
@ -232,6 +292,10 @@ def test_fetch_details_not_in_search_result(plex, user=None):
log(2, '%s actors found.' % len(actors))
#-----------------------
# Play Queue
#-----------------------
@register('queue')
def test_play_queues(plex, user=None):
episode = plex.library.section(SHOW_SECTION).get(SHOW_TITLE).get(SHOW_EPISODE)
@ -241,6 +305,10 @@ def test_play_queues(plex, user=None):
assert playqueue.playQueueID, 'Play queue ID not set.'
#-----------------------
# Client
#-----------------------
@register('client')
def test_list_devices(plex, user=None):
assert user, 'Must specify username, password & resource to run this test.'
@ -267,7 +335,7 @@ def test_client_play_media(plex, user=None):
client.stop()
# def test_016_sync_items(plex, user=None):
# def test_sync_items(plex, user=None):
# user = MyPlexUser('user', 'pass')
# device = user.getDevice('device-uuid')
# # fetch the sync items via the device sync list
@ -294,5 +362,6 @@ if __name__ == '__main__':
parser.add_argument('--baseuri', help='Baseuri needed for auth token authentication')
parser.add_argument('--token', help='Auth token (instead of user/pass)')
parser.add_argument('--query', help='Only run the specified tests.')
parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Print verbose logging.')
args = parser.parse_args()
run_tests(__name__, args)

View file

@ -1,7 +1,7 @@
"""
Test Library Functions
"""
import inspect, sys
import inspect, sys, traceback
import datetime, time
from plexapi import server
from plexapi.myplex import MyPlexUser
@ -61,12 +61,15 @@ def run_tests(module, args):
log(2, 'PASS! (runtime: %.3fs; queries: %s)' % (runtime, queries), 'blue')
tests['passed'] += 1
except Exception as err:
log(2, 'FAIL!: %s' % err, 'red')
errstr = str(err)
errstr += '\n%s' % traceback.format_exc() if args.verbose else errstr
log(2, 'FAIL!: %s' % errstr, 'red')
tests['failed'] += 1
log(0, '')
log(0, 'Tests Run: %s' % sum(tests.values()))
log(0, 'Tests Passed: %s' % tests['passed'])
log(0, 'Tests Failed: %s' % tests['failed'])
if tests['failed']:
log(0, 'Tests Failed: %s' % tests['failed'], 'red')
if not tests['failed']:
log(0, '')
log(0, 'EVERYTHING OK!! :)')