New test for client.timeline(); Fix bug in missing proper headers for timeline

This commit is contained in:
Michael Shepanski 2016-03-31 23:39:09 -04:00
parent 7bce1c4b32
commit 7bb700d395
4 changed files with 59 additions and 46 deletions

View file

@ -13,7 +13,7 @@ CONFIG = PlexConfig(CONFIG_PATH)
# Core Settings
PROJECT = 'PlexAPI' # name provided to plex server
VERSION = '2.0.0a' # version of this api
TIMEOUT = CONFIG.get('plexapi.timeout', 5, int) # request timeout
TIMEOUT = CONFIG.get('plexapi.timeout', 30, int) # request timeout
X_PLEX_CONTAINER_SIZE = 50 # max results to return in a single search page
# Plex Header Configuation

View file

@ -6,7 +6,7 @@ https://github.com/plexinc/plex-media-player/wiki/Remote-control-API
"""
import requests
from requests.status_codes import _codes as codes
from plexapi import TIMEOUT, log, utils
from plexapi import BASE_HEADERS, TIMEOUT, log, utils
from plexapi.exceptions import BadRequest, Unsupported
from xml.etree import ElementTree
@ -50,15 +50,10 @@ class Client(object):
if controller not in self.protocolCapabilities:
raise Unsupported('Client %s does not support the %s controller.' % (self.quickName, controller))
self._commandId += 1
params.update({
'X-Plex-Device-Name': self.name,
'X-Plex-Client-Identifier': self.server.machineIdentifier,
'X-Plex-Target-Client-Identifier': self.machineIdentifier,
'commandID': self._commandId,
})
params['commandID'] = self._commandId
url = 'http://%s:%s/player/%s%s' % (self.address, self.port, command.lstrip('/'), utils.joinArgs(params))
log.info('GET %s', url)
response = requests.get(url, timeout=TIMEOUT)
response = requests.get(url, headers=BASE_HEADERS, timeout=TIMEOUT)
if response.status_code != requests.codes.ok:
codename = codes.get(response.status_code)[0]
raise BadRequest('(%s) %s' % (response.status_code, codename))
@ -97,7 +92,7 @@ class Client(object):
}, **params))
# Playback Commands
# most of the playback commands take a mandatory mtype {'music','photo','video'} argument,
# Most of the playback commands take a mandatory mtype {'music','photo','video'} argument,
# to specify which media type to apply the command to, (except for playMedia). This
# is in case there are multiple things happening (e.g. music in the background, photo
# slideshow in the foreground).
@ -138,7 +133,6 @@ class Client(object):
self.sendCommand('playback/setParameters', **params)
def setStreams(self, audioStreamID=None, subtitleStreamID=None, videoStreamID=None, mtype=None):
# Can possibly send {next,on,off}
params = {}
if audioStreamID is not None: params['audioStreamID'] = audioStreamID
if subtitleStreamID is not None: params['subtitleStreamID'] = subtitleStreamID
@ -147,12 +141,12 @@ class Client(object):
self.sendCommand('playback/setStreams', **params)
# Timeline Commands
# TODO: We should properly parse the Timeline XML objects here
def timeline(self):
self.sendCommand('timeline/poll', **{'wait':1, 'commandID':4})
return self.sendCommand('timeline/poll', **{'wait':1, 'commandID':4})
def isPlayingMedia(self):
timeline = self.timeline()
for media_type in timeline:
if media_type.get('state') == 'playing':
for mediatype in self.timeline():
if mediatype.get('state') == 'playing':
return True
return False

View file

@ -113,6 +113,15 @@ class LibrarySection(object):
def onDeck(self):
return utils.listItems(self.server, '/library/sections/%s/onDeck' % self.key)
def analyze(self):
self.server.query('/library/sections/%s/analyze' % self.key)
def emptyTrash(self):
self.server.query('/library/sections/%s/emptyTrash' % self.key)
def refresh(self):
self.server.query('/library/sections/%s/refresh' % self.key)
def listChoices(self, category, libtype=None, **kwargs):
""" List choices for the specified filter category. kwargs can be any of the same
kwargs in self.search() to help narrow down the choices to only those that
@ -171,15 +180,6 @@ class LibrarySection(object):
results += subresults[:maxresults-len(results)]
args['X-Plex-Container-Start'] += args['X-Plex-Container-Size']
return results
def analyze(self):
self.server.query('/library/sections/%s/analyze' % self.key)
def emptyTrash(self):
self.server.query('/library/sections/%s/emptyTrash' % self.key)
def refresh(self):
self.server.query('/library/sections/%s/refresh' % self.key)
def _cleanSearchFilter(self, category, value, libtype=None):
# check a few things before we begin

View file

@ -144,7 +144,6 @@ def test_crazy_search(plex, user=None):
assert movie in movies.search(actor=judy.id), 'Unable to filter movie by year.'
#-----------------------
# Library Navigation
#-----------------------
@ -392,10 +391,10 @@ def test_client_navigation(plex, user=None):
# @register('client')
# def test_client_navigation_via_proxy(plex, user=None):
# client = plex.client(PLEX_CLIENT)
# client.proxyThroughServer()
# _navigate(plex, client)
def test_client_navigation_via_proxy(plex, user=None):
client = plex.client(PLEX_CLIENT)
client.proxyThroughServer()
_navigate(plex, client)
def _navigate(plex, client):
@ -437,10 +436,10 @@ def test_video_playback(plex, user=None):
# @register('client')
# def test_video_playback_via_proxy(plex, user=None):
# client = plex.client(PLEX_CLIENT)
# client.proxyThroughServer()
# _video_playback(plex, client)
def test_video_playback_via_proxy(plex, user=None):
client = plex.client(PLEX_CLIENT)
client.proxyThroughServer()
_video_playback(plex, client)
def _video_playback(plex, client):
@ -467,19 +466,39 @@ def _video_playback(plex, client):
client.stop(mtype); time.sleep(1)
# def test_sync_items(plex, user=None):
# user = MyPlexUser('user', 'pass')
# device = user.getDevice('device-uuid')
# # fetch the sync items via the device sync list
# for item in device.sync_items():
# # fetch the media object associated with the sync item
# for video in item.get_media():
# # fetch the media parts (actual video/audio streams) associated with the media
# for part in video.iterParts():
# print('Found media to download!')
# # make the relevant sync id (media part) as downloaded
# # this tells the server that this device has successfully downloaded this media part of this sync item
# item.mark_as_done(part.sync_id)
@register('client')
def test_client_timeline(plex, user=None):
mtype = 'video'
client = plex.client(PLEX_CLIENT)
movie = plex.library.section(MOVIE_SECTION).get(MOVIE_TITLE)
playing = client.isPlayingMedia()
log(2, 'Playing Media: %s' % playing)
assert playing is False, 'isPlayingMedia() should have returned False.'
client.playMedia(movie); time.sleep(30)
playing = client.isPlayingMedia()
log(2, 'Playing Media: %s' % playing)
assert playing is True, 'isPlayingMedia() should have returned True.'
client.stop(mtype); time.sleep(30)
playing = client.isPlayingMedia()
log(2, 'Playing Media: %s' % playing)
assert playing is False, 'isPlayingMedia() should have returned False.'
# TODO: MAKE THIS WORK..
# @register('client')
def test_sync_items(plex, user=None):
user = MyPlexUser('user', 'pass')
device = user.getDevice('device-uuid')
# fetch the sync items via the device sync list
for item in device.sync_items():
# fetch the media object associated with the sync item
for video in item.get_media():
# fetch the media parts (actual video/audio streams) associated with the media
for part in video.iterParts():
print('Found media to download!')
# make the relevant sync id (media part) as downloaded
# this tells the server that this device has successfully downloaded this media part of this sync item
item.mark_as_done(part.sync_id)
#-----------------------