Ability to directly create a PlexClient object (without needing a PlexServer object); Cleaned up MyPlex code; Removed duplicated Client navigation code from myplex (everything uses PlexClient now)

This commit is contained in:
Michael Shepanski 2016-04-02 02:19:32 -04:00
parent 7bb700d395
commit 84a6999ed7
13 changed files with 251 additions and 244 deletions

View file

@ -28,13 +28,13 @@ plex = PlexServer() # Defaults to localhost:32400
If you want to avoid logging into MyPlex and you already know your auth token
string, you can use the PlexServer object directly as above, but passing in
the baseuri and auth token directly.
the baseurl and auth token directly.
```python
from plexapi.server import MyPlexUser
baseuri = 'http://plexserver:32400'
baseurl = 'http://plexserver:32400'
token = '2ffLuB84dqLswk9skLos'
plex = PlexServer(baseuri, token)
plex = PlexServer(baseurl, token)
```
If you are running on a separate network or using Plex Users you can log
@ -46,7 +46,7 @@ available libraries.
```python
from plexapi.myplex import MyPlexUser
user = MyPlexUser.signin('<USERNAME>', '<PASSWORD>')
plex = user.getResource('<SERVERNAME>').connect() # returns a PlexServer instance
plex = user.resource('<SERVERNAME>').connect() # returns a PlexServer instance
```

View file

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import logging, os
from logging.handlers import RotatingFileHandler
from platform import platform, uname
from platform import uname
from plexapi.config import PlexConfig, reset_base_headers
from uuid import getnode
@ -22,7 +22,8 @@ X_PLEX_PLATFORM = CONFIG.get('headers.platorm', uname()[0]) #
X_PLEX_PLATFORM_VERSION = CONFIG.get('headers.platform_version', uname()[2]) # Operating system version, eg 4.3.1, 10.6.7, 3.2
X_PLEX_PRODUCT = CONFIG.get('headers.product', PROJECT) # Plex application name, eg Laika, Plex Media Server, Media Link
X_PLEX_VERSION = CONFIG.get('headers.version', VERSION) # Plex application version number
X_PLEX_DEVICE = CONFIG.get('headers.platform', platform()) # Device name and model number, eg iPhone3,2, Motorola XOOM, LG5200TV
X_PLEX_DEVICE = CONFIG.get('headers.platform', X_PLEX_PLATFORM) # Device make, eg iPhone, FiteTV, Linux, etc.
X_PLEX_DEVICE_NAME = uname()[1] # Device name, hostname or phone name, etc.
X_PLEX_IDENTIFIER = CONFIG.get('headers.identifier', str(hex(getnode()))) # UUID, serial number, or other number unique per device
BASE_HEADERS = reset_base_headers()

View file

@ -7,70 +7,87 @@ https://github.com/plexinc/plex-media-player/wiki/Remote-control-API
import requests
from requests.status_codes import _codes as codes
from plexapi import BASE_HEADERS, TIMEOUT, log, utils
from plexapi.exceptions import BadRequest, Unsupported
from plexapi.exceptions import BadRequest, NotFound, Unsupported
from xml.etree import ElementTree
class Client(object):
class PlexClient(object):
def __init__(self, server, data):
def __init__(self, baseurl, token=None, session=None, server=None):
self.baseurl = baseurl.strip('/')
self.token = token
self.session = session or requests.Session()
self.server = server
self.name = data.attrib.get('name')
self.host = data.attrib.get('host')
self.address = data.attrib.get('address')
self.port = data.attrib.get('port')
self.machineIdentifier = data.attrib.get('machineIdentifier')
self.title = data.attrib.get('title')
self.version = data.attrib.get('version')
self.platform = data.attrib.get('platform')
self.protocol = data.attrib.get('protocol')
self.product = data.attrib.get('product')
data = self._connect()
self.deviceClass = data.attrib.get('deviceClass')
self.protocolVersion = data.attrib.get('protocolVersion')
self.machineIdentifier = data.attrib.get('machineIdentifier')
self.product = data.attrib.get('product')
self.protocol = data.attrib.get('protocol')
self.protocolCapabilities = data.attrib.get('protocolCapabilities', '').split(',')
self.state = data.attrib.get('state')
self.protocolVersion = data.attrib.get('protocolVersion')
self.platform = data.attrib.get('platform')
self.platformVersion = data.attrib.get('platformVersion')
self.title = data.attrib.get('title')
self._proxyThroughServer = False
self._commandId = 0
@property
def quickName(self):
return self.name or self.product
def _connect(self):
try:
return self.query('/resources')[0]
except Exception as err:
log.error('%s: %s', self.baseurl, err)
raise NotFound('No client found at: %s' % self.baseurl)
def headers(self):
headers = BASE_HEADERS
if self.token:
headers['X-Plex-Token'] = self.token
return headers
def proxyThroughServer(self, value=True):
if value is True and not self.server:
raise Unsupported('Cannot use client proxy with unknown server.')
self._proxyThroughServer = value
def sendCommand(self, command, proxy=None, **params):
proxy = self._proxyThroughServer if proxy is None else proxy
if proxy: return self.sendServerCommand(command, **params)
return self.sendClientCommand(command, **params)
def sendClientCommand(self, command, **params):
command = command.strip('/')
controller = command.split('/')[0]
if controller not in self.protocolCapabilities:
raise Unsupported('Client %s does not support the %s controller.' % (self.quickName, controller))
self._commandId += 1
params['commandID'] = self._commandId
url = 'http://%s:%s/player/%s%s' % (self.address, self.port, command.lstrip('/'), utils.joinArgs(params))
log.info('GET %s', url)
response = requests.get(url, headers=BASE_HEADERS, timeout=TIMEOUT)
if response.status_code != requests.codes.ok:
def query(self, path, method=None, **kwargs):
url = self.url(path)
method = method or self.session.get
log.info('%s %s', method.__name__.upper(), url)
response = method(url, headers=self.headers(), timeout=TIMEOUT, **kwargs)
if response.status_code not in [200, 201]:
codename = codes.get(response.status_code)[0]
raise BadRequest('(%s) %s' % (response.status_code, codename))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data else None
def sendServerCommand(self, command, **params):
params.update({'commandID': self._commandId})
path = '/system/players/%s/%s%s' % (self.address, command, utils.joinArgs(params))
self.server.query(path)
def sendCommand(self, command, proxy=None, **params):
proxy = self._proxyThroughServer if proxy is None else proxy
if proxy:
# send command via server proxy
self._commandId += 1; params['commandID'] = self._commandId
path = '/system/players/%s/%s%s' % (self.address, command, utils.joinArgs(params))
return self.server.query(path)
else:
# send command directly to client
command = command.strip('/')
controller = command.split('/')[0]
if controller not in self.protocolCapabilities:
raise Unsupported('Client %s does not support the %s controller.' % (self.title, controller))
self._commandId += 1; params['commandID'] = self._commandId
return self.query('/player/%s%s' % (command, utils.joinArgs(params)))
def url(self, path):
if self.token:
delim = '&' if '?' in path else '?'
return '%s%s%sX-Plex-Token=%s' % (self.baseurl, path, delim, self.token)
return '%s%s' % (self.baseurl, path)
# Navigation Commands
# These commands navigate around the user interface.
def contextMenu(self): self.sendCommand('navigation/contextMenu')
def goBack(self): self.sendCommand('navigation/back')
def goToHome(self): self.sendCommand('/navigation/home')
def goToMusic(self): self.sendCommand('/navigation/music')
def goToHome(self): self.sendCommand('navigation/home')
def goToMusic(self): self.sendCommand('navigation/music')
def moveDown(self): self.sendCommand('navigation/moveDown')
def moveLeft(self): self.sendCommand('navigation/moveLeft')
def moveRight(self): self.sendCommand('navigation/moveRight')
@ -83,11 +100,13 @@ class Client(object):
def toggleOSD(self): self.sendCommand('navigation/toggleOSD')
def goToMedia(self, media, **params):
server_uri = media.server.baseuri.split(':')
if not self.server:
raise Unsupported('A server must be specified before using this command.')
server_url = media.server.baseurl.split(':')
self.sendCommand('mirror/details', **dict({
'machineIdentifier': self.server.machineIdentifier,
'address': server_uri[1].strip('/'),
'port': server_uri[-1],
'address': server_url[1].strip('/'),
'port': server_url[-1],
'key': media.key,
}, **params))
@ -114,12 +133,14 @@ class Client(object):
def setVideoStream(self, videoStreamID, mtype): self.setStreams(videoStreamID=videoStreamID, mtype=mtype)
def playMedia(self, media, **params):
server_uri = media.server.baseuri.split(':')
if not self.server:
raise Unsupported('A server must be specified before using this command.')
server_url = media.server.baseurl.split(':')
playqueue = self.server.createPlayQueue(media)
self.sendCommand('playback/playMedia', **dict({
'machineIdentifier': self.server.machineIdentifier,
'address': server_uri[1].strip('/'),
'port': server_uri[-1],
'address': server_url[1].strip('/'),
'port': server_url[-1],
'key': media.key,
'containerKey': '/playQueues/%s?window=100&own=1' % playqueue.playQueueID,
}, **params))

View file

@ -44,5 +44,6 @@ def reset_base_headers():
'X-Plex-Product': plexapi.X_PLEX_PRODUCT,
'X-Plex-Version': plexapi.X_PLEX_VERSION,
'X-Plex-Device': plexapi.X_PLEX_DEVICE,
'X-Plex-Device-Name': plexapi.X_PLEX_DEVICE_NAME,
'X-Plex-Client-Identifier': plexapi.X_PLEX_IDENTIFIER,
}

View file

@ -26,7 +26,7 @@ class Media(object):
self.videoFrameRate = data.attrib.get('videoFrameRate')
self.optimizedForStreaming = cast(bool, data.attrib.get('optimizedForStreaming'))
self.optimizedForStreaming = cast(bool, data.attrib.get('has64bitOffsets'))
self.parts = [MediaPart(server, elem, initpath, self) for elem in data]
self.parts = [MediaPart(server, e, initpath, self) for e in data]
def __repr__(self):
title = self.video.title.replace(' ','.')[0:20]

View file

@ -5,17 +5,18 @@ PlexAPI MyPlex
import plexapi, requests
from plexapi import TIMEOUT, log, utils
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
from plexapi.client import PlexClient
from plexapi.server import PlexServer
from plexapi.utils import cast, toDatetime
from requests.status_codes import _codes as codes
from threading import Thread
from xml.etree import ElementTree
MYPLEX_SIGNIN = 'https://my.plexapp.com/users/sign_in.xml'
MYPLEX_DEVICES = 'https://plex.tv/devices.xml'
MYPLEX_RESOURCES = 'https://plex.tv/api/resources?includeHttps=1'
class MyPlexUser(object):
""" Logs into my.plexapp.com to fetch account and token information. This
useful to get a token if not on the local network.
"""
SIGNIN = 'https://my.plexapp.com/users/sign_in.xml'
def __init__(self, data, initpath=None):
self.initpath = initpath
@ -29,25 +30,25 @@ class MyPlexUser(object):
self.queueEmail = data.attrib.get('queueEmail')
self.queueUid = data.attrib.get('queueUid')
def resources(self):
return MyPlexResource.fetchResources(self.authenticationToken)
def getResource(self, search, port=32400):
""" Searches server.name, server.sourceTitle and server.host:server.port
from the list of available for this PlexUser.
"""
return _findResource(self.resources(), search, port)
def devices(self):
return MyPlexDevice.fetchResources(self.authenticationToken)
return _listItems(MYPLEX_DEVICES, self.authenticationToken, MyPlexDevice)
def device(self, name):
return _findItem(self.devices(), name)
def resources(self):
return _listItems(MYPLEX_RESOURCES, self.authenticationToken, MyPlexResource)
def resource(self, name):
return _findItem(self.resources(), name)
@classmethod
def signin(cls, username, password):
if 'X-Plex-Token' in plexapi.BASE_HEADERS:
del plexapi.BASE_HEADERS['X-Plex-Token']
auth = (username, password)
log.info('POST %s', cls.SIGNIN)
response = requests.post(cls.SIGNIN, headers=plexapi.BASE_HEADERS, auth=auth, timeout=TIMEOUT)
log.info('POST %s', MYPLEX_SIGNIN)
response = requests.post(MYPLEX_SIGNIN, headers=plexapi.BASE_HEADERS, auth=auth, timeout=TIMEOUT)
if response.status_code != requests.codes.created:
codename = codes.get(response.status_code)[0]
if response.status_code == 401:
@ -58,7 +59,6 @@ class MyPlexUser(object):
class MyPlexAccount(object):
""" Represents myPlex account if you already have a connection to a server. """
def __init__(self, server, data):
self.authToken = data.attrib.get('authToken')
@ -76,18 +76,13 @@ class MyPlexAccount(object):
self.subscriptionState = data.attrib.get('subscriptionState')
def resources(self):
return MyPlexResource.fetchResources(self.authToken)
return _listItems(MYPLEX_RESOURCES, self.authToken, MyPlexResource)
def getResource(self, search, port=32400):
""" Searches server.name, server.sourceTitle and server.host:server.port
from the list of available for this PlexAccount.
"""
return _findResource(self.resources(), search, port)
def resource(self, name):
return _findItem(self.resources(), name)
class MyPlexResource(object):
RESOURCES = 'https://plex.tv/api/resources?includeHttps=1'
SSLTESTS = [(True, 'uri'), (False, 'http_uri')]
def __init__(self, data):
self.name = data.attrib.get('name')
@ -111,48 +106,33 @@ class MyPlexResource(object):
return '<%s:%s>' % (self.__class__.__name__, self.name.encode('utf8'))
def connect(self, ssl=None):
# Sort connections from (https, local) to (http, remote)
# Only check non-local connections unless we own the resource
forcelocal = lambda c: self.owned or c.local
connections = sorted(self.connections, key=lambda c:c.local, reverse=True)
if not self.owned:
connections = [c for c in connections if c.local is False]
https = [c.uri for c in self.connections if forcelocal(c)]
http = [c.httpuri for c in self.connections if forcelocal(c)]
connections = https + http
# Try connecting to all known resource connections in parellel, but
# only return the first server (in order) that provides a response.
threads, results = [], []
for testssl, attr in self.SSLTESTS:
if ssl in [None, testssl]:
for i in range(len(connections)):
uri = getattr(connections[i], attr)
args = (uri, results, len(results))
results.append(None)
threads.append(Thread(target=self._connect, args=args))
threads[-1].start()
for thread in threads:
thread.join()
# At this point we have a list of result tuples containing (uri, PlexServer)
# or (uri, None) in the case a connection could not be established.
for uri, result in results:
log.info('Testing connection: %s %s', uri, 'OK' if result else 'ERR')
results = list(filter(None, [r[1] for r in results if r]))
listargs = [[c] for c in connections]
results = utils.threaded(self._connect, listargs)
# At this point we have a list of result tuples containing (url, token, PlexServer)
# or (url, token, None) in the case a connection could not be established.
for url, token, result in results:
okerr = 'OK' if result else 'ERR'
log.info('Testing resource connection: %s?X-Plex-Token=%s %s', url, token, okerr)
results = list(filter(None, [r[2] for r in results if r]))
if not results:
raise NotFound('Unable to connect to resource: %s' % self.name)
log.info('Connecting to server: %s', results[0])
log.info('Connecting to server: %s?X-Plex-Token=%s', results[0].baseurl, results[0].token)
return results[0]
def _connect(self, uri, results, i):
def _connect(self, url, results, i):
try:
from plexapi.server import PlexServer
results[i] = (uri, PlexServer(uri, self.accessToken))
results[i] = (url, self.accessToken, PlexServer(url, self.accessToken))
except NotFound:
results[i] = (uri, None)
@classmethod
def fetchResources(cls, token):
headers = plexapi.BASE_HEADERS
headers['X-Plex-Token'] = token
log.info('GET %s?X-Plex-Token=%s', cls.RESOURCES, token)
response = requests.get(cls.RESOURCES, headers=headers, timeout=TIMEOUT)
data = ElementTree.fromstring(response.text.encode('utf8'))
return [MyPlexResource(elem) for elem in data]
results[i] = (url, self.accessToken, None)
class ResourceConnection(object):
@ -163,18 +143,13 @@ class ResourceConnection(object):
self.port = cast(int, data.attrib.get('port'))
self.uri = data.attrib.get('uri')
self.local = cast(bool, data.attrib.get('local'))
@property
def http_uri(self):
return 'http://%s:%s' % (self.address, self.port)
self.httpuri = 'http://%s:%s' % (self.address, self.port)
def __repr__(self):
return '<%s:%s>' % (self.__class__.__name__, self.uri.encode('utf8'))
# TODO: Is this a plex client in disguise?
class MyPlexDevice(object):
DEVICES = 'https://plex.tv/devices.xml'
def __init__(self, data):
self.name = data.attrib.get('name')
@ -193,83 +168,46 @@ class MyPlexDevice(object):
self.token = data.attrib.get('token')
self.screenResolution = data.attrib.get('screenResolution')
self.screenDensity = data.attrib.get('screenDensity')
self.connectionsUris = [connection.attrib.get('uri') for connection in data.iter('Connection')]
self.connections = [connection.attrib.get('uri') for connection in data.iter('Connection')]
def __repr__(self):
return '<%s:%s:%s>' % (self.__class__.__name__, self.name.encode('utf8'), self.product.encode('utf8'))
@property
def isReachable(self):
return len(self.connectionsUris)
def connect(self, ssl=None):
# Try connecting to all known resource connections in parellel, but
# only return the first server (in order) that provides a response.
listargs = [[c] for c in self.connections]
results = utils.threaded(self._connect, listargs)
# At this point we have a list of result tuples containing (url, token, PlexServer)
# or (url, token, None) in the case a connection could not be established.
for url, token, result in results:
okerr = 'OK' if result else 'ERR'
log.info('Testing device connection: %s?X-Plex-Token=%s %s', url, token, okerr)
results = list(filter(None, [r[2] for r in results if r]))
if not results:
raise NotFound('Unable to connect to resource: %s' % self.name)
log.info('Connecting to server: %s?X-Plex-Token=%s', results[0].baseurl, results[0].token)
return results[0]
@property
def baseUrl(self):
if not self.isReachable:
raise Exception('This device is not reachable')
return self.connectionsUris[0]
@classmethod
def fetchResources(cls, token):
headers = plexapi.BASE_HEADERS
headers['X-Plex-Token'] = token
log.info('GET %s?X-Plex-Token=%s', cls.DEVICES, token)
response = requests.get(cls.DEVICES, headers=headers, timeout=TIMEOUT)
data = ElementTree.fromstring(response.text.encode('utf8'))
return [MyPlexDevice(elem) for elem in data]
def sendCommand(self, command, args=None):
url = '%s%s' % (self.url(command), utils.joinArgs(args))
log.info('GET %s', url)
headers = plexapi.BASE_HEADERS
headers['X-Plex-Target-Client-Identifier'] = self.clientIdentifier
response = requests.get(url, headers=headers, timeout=TIMEOUT)
if response.status_code != requests.codes.ok:
codename = codes.get(response.status_code)[0]
raise BadRequest('(%s) %s' % (response.status_code, codename))
data = response.text.encode('utf8')
if data:
try:
return ElementTree.fromstring(data)
except:
pass
return None
def url(self, path):
return '%s/player/%s' % (self.baseUrl, path.lstrip('/'))
# Navigation Commands
def moveUp(self, args=None): self.sendCommand('navigation/moveUp', args) # noqa
def moveDown(self, args=None): self.sendCommand('navigation/moveDown', args) # noqa
def moveLeft(self, args=None): self.sendCommand('navigation/moveLeft', args) # noqa
def moveRight(self, args=None): self.sendCommand('navigation/moveRight', args) # noqa
def pageUp(self, args=None): self.sendCommand('navigation/pageUp', args) # noqa
def pageDown(self, args=None): self.sendCommand('navigation/pageDown', args) # noqa
def nextLetter(self, args=None): self.sendCommand('navigation/nextLetter', args) # noqa
def previousLetter(self, args=None): self.sendCommand('navigation/previousLetter', args) # noqa
def select(self, args=None): self.sendCommand('navigation/select', args) # noqa
def back(self, args=None): self.sendCommand('navigation/back', args) # noqa
def contextMenu(self, args=None): self.sendCommand('navigation/contextMenu', args) # noqa
def toggleOSD(self, args=None): self.sendCommand('navigation/toggleOSD', args) # noqa
# Playback Commands
def play(self, args=None): self.sendCommand('playback/play', args) # noqa
def pause(self, args=None): self.sendCommand('playback/pause', args) # noqa
def stop(self, args=None): self.sendCommand('playback/stop', args) # noqa
def stepForward(self, args=None): self.sendCommand('playback/stepForward', args) # noqa
def bigStepForward(self, args=None): self.sendCommand('playback/bigStepForward', args) # noqa
def stepBack(self, args=None): self.sendCommand('playback/stepBack', args) # noqa
def bigStepBack(self, args=None): self.sendCommand('playback/bigStepBack', args) # noqa
def skipNext(self, args=None): self.sendCommand('playback/skipNext', args) # noqa
def skipPrevious(self, args=None): self.sendCommand('playback/skipPrevious', args) # noqa
def _connect(self, url, results, i):
try:
results[i] = (url, self.token, PlexClient(url, self.token))
except NotFound as err:
print(err)
results[i] = (url, self.token, None)
def _findResource(resources, search, port=32400):
""" Searches server.name """
search = search.lower()
log.info('Looking for server: %s', search)
for server in resources:
if search == server.name.lower():
log.info('Server found: %s', server)
return server
log.info('Unable to find server: %s', search)
raise NotFound('Unable to find server: %s' % search)
def _findItem(items, name):
for item in items:
if name.lower() == item.name.lower():
return item
raise NotFound('Unable to find item: %s' % name)
def _listItems(url, token, cls):
headers = plexapi.BASE_HEADERS
headers['X-Plex-Token'] = token
log.info('GET %s?X-Plex-Token=%s', url, token)
response = requests.get(url, headers=headers, timeout=TIMEOUT)
data = ElementTree.fromstring(response.text.encode('utf8'))
return [cls(elem) for elem in data]

View file

@ -13,7 +13,7 @@ class Playlist(utils.PlexPartialObject):
def _loadData(self, data):
self.addedAt = toDatetime(data.attrib.get('addedAt', NA))
self.composite = data.attrib.get('composite', NA) # uri to thumbnail
self.composite = data.attrib.get('composite', NA) # url to thumbnail
self.duration = cast(int, data.attrib.get('duration', NA))
self.durationInSeconds = cast(int, data.attrib.get('durationInSeconds', NA))
self.guid = data.attrib.get('guid', NA)

View file

@ -8,21 +8,19 @@ from plexapi import BASE_HEADERS, TIMEOUT
from plexapi import log, utils
from plexapi import audio, video, playlist # noqa; required
from plexapi.compat import quote
from plexapi.client import Client
from plexapi.client import PlexClient
from plexapi.exceptions import BadRequest, NotFound
from plexapi.library import Library
from plexapi.myplex import MyPlexAccount
from plexapi.playqueue import PlayQueue
from xml.etree import ElementTree
TOTAL_QUERIES = 0
DEFAULT_BASEURI = 'http://localhost:32400'
DEFAULT_BASEURL = 'http://localhost:32400'
class PlexServer(object):
def __init__(self, baseuri=None, token=None, session=None):
self.baseuri = baseuri or DEFAULT_BASEURI
def __init__(self, baseurl=None, token=None, session=None):
self.baseurl = baseurl or DEFAULT_BASEURL
self.token = token
self.session = session or requests.Session()
data = self._connect()
@ -40,33 +38,36 @@ class PlexServer(object):
self.version = data.attrib.get('version')
def __repr__(self):
return '<%s:%s>' % (self.__class__.__name__, self.baseuri)
return '<%s:%s>' % (self.__class__.__name__, self.baseurl)
def _connect(self):
try:
return self.query('/')
except Exception as err:
log.error('%s: %s', self.baseuri, err)
raise NotFound('No server found at: %s' % self.baseuri)
log.error('%s: %s', self.baseurl, err)
raise NotFound('No server found at: %s' % self.baseurl)
@property
def library(self):
return Library(self, self.query('/library/'))
def account(self):
from plexapi.myplex import MyPlexAccount
data = self.query('/myplex/account')
return MyPlexAccount(self, data)
def clients(self):
items = []
for elem in self.query('/clients'):
items.append(Client(self, elem))
baseurl = 'http://%s:%s' % (elem.attrib['address'], elem.attrib['port'])
items.append(PlexClient(baseurl, server=self))
return items
def client(self, name):
for elem in self.query('/clients'):
if elem.attrib.get('name').lower() == name.lower():
return Client(self, elem)
baseurl = 'http://%s:%s' % (elem.attrib['address'], elem.attrib['port'])
return PlexClient(baseurl, server=self)
raise NotFound('Unknown client name: %s' % name)
def createPlayQueue(self, item):
@ -88,8 +89,6 @@ class PlexServer(object):
raise NotFound('Invalid playlist title: %s' % title)
def query(self, path, method=None, **kwargs):
global TOTAL_QUERIES
TOTAL_QUERIES += 1
url = self.url(path)
method = method or self.session.get
log.info('%s %s', method.__name__.upper(), url)
@ -113,5 +112,5 @@ class PlexServer(object):
def url(self, path):
if self.token:
delim = '&' if '?' in path else '?'
return '%s%s%sX-Plex-Token=%s' % (self.baseuri, path, delim, self.token)
return '%s%s' % (self.baseuri, path)
return '%s%s%sX-Plex-Token=%s' % (self.baseurl, path, delim, self.token)
return '%s%s' % (self.baseurl, path)

View file

@ -8,6 +8,7 @@ from plexapi.exceptions import NotFound
class SyncItem(object):
def __init__(self, device, data, servers=None):
self.device = device
self.servers = servers
@ -23,21 +24,20 @@ class SyncItem(object):
self.location = data.find('Location').attrib.copy()
def __repr__(self):
return '<{0}:{1}>'.format(self.__class__.__name__, self.id)
return '<%s:%s>' % (self.__class__.__name__, self.id)
def server(self):
server = list(filter(lambda x: x.machineIdentifier == self.machineIdentifier, self.servers))
if 0 == len(server):
raise NotFound('Unable to find server with uuid %s' % self.machineIdentifier)
return server[0]
def getMedia(self):
server = self.server().connect()
items = utils.listItems(server, '/sync/items/{0}'.format(self.id))
items = utils.listItems(server, '/sync/items/%s' % self.id)
return items
def markAsDone(self, sync_id):
server = self.server().connect()
uri = '/sync/{0}/{1}/files/{2}/downloaded'.format(self.device.clientIdentifier, server.machineIdentifier, sync_id)
server.query(uri, method=requests.put)
url = '/sync/%s/%s/files/%s/downloaded' % (self.device.clientIdentifier, server.machineIdentifier, sync_id)
server.query(url, method=requests.put)

View file

@ -7,6 +7,7 @@ from requests import put
from datetime import datetime
from plexapi.compat import quote, urlencode
from plexapi.exceptions import NotFound, UnknownType, Unsupported
from threading import Thread
# Search Types - Plex uses these to filter specific media types when searching.
@ -90,8 +91,8 @@ class PlexPartialObject(object):
def _findPlayer(self, data):
elem = data.find('Player')
if elem is not None:
from plexapi.client import Client
return Client(self.server, elem)
from plexapi.client import PlexClient
return PlexClient(self.server, elem)
return None
def _findStreams(self, streamtype):
@ -241,6 +242,18 @@ def searchType(libtype):
return stype
def threaded(callback, listargs):
threads, results = [], []
for args in listargs:
args += [results, len(results)]
results.append(None)
threads.append(Thread(target=callback, args=args))
threads[-1].start()
for thread in threads:
thread.join()
return results
def toDatetime(value, format=None):
if value and value != NA:
if format: value = datetime.strptime(value, format)

View file

@ -84,14 +84,14 @@ if __name__ == '__main__':
# There are three ways to authenticate:
# 1. If the server is running on localhost, just run without any auth.
# 2. Pass in --username, --password, and --resource.
# 3. Pass in --baseuri, --token
# 3. Pass in --baseurl, --token
parser = argparse.ArgumentParser(description='Run PlexAPI examples.')
parser.add_argument('--username', help='Username for the Plex server.')
parser.add_argument('--password', help='Password for the Plex server.')
parser.add_argument('--resource', help='Name of the Plex resource (requires user/pass).')
parser.add_argument('--baseuri', help='Baseuri needed for auth token authentication')
parser.add_argument('--token', help='Auth token (instead of user/pass)')
parser.add_argument('--example', help='Only run the specified example.')
parser.add_argument('-u', '--username', help='Username for the Plex server.')
parser.add_argument('-p', '--password', help='Password for the Plex server.')
parser.add_argument('-r', '--resource', help='Name of the Plex resource (requires user/pass).')
parser.add_argument('-b', '--baseurl', help='Baseurl needed for auth token authentication')
parser.add_argument('-t', '--token', help='Auth token (instead of user/pass)')
parser.add_argument('-q', '--example', help='Only run the specified example.')
parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Print verbose logging.')
args = parser.parse_args()
plex, user = fetch_server(args)

View file

@ -11,6 +11,8 @@ import argparse, sys, time
from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__))))
from utils import log, register, run_tests
from plexapi.client import PlexClient
from plexapi.exceptions import NotFound
from plexapi.utils import NA
SHOW_SECTION = 'TV Shows'
@ -23,8 +25,9 @@ AUDIO_SECTION = 'Music'
AUDIO_ARTIST = 'Beastie Boys'
AUDIO_ALBUM = 'Licensed To Ill'
AUDIO_TRACK = 'Brass Monkey'
PLEX_CLIENT = 'pkkid-home'
PLEX_CLIENT = 'pkkid-home'
PLEX_CLIENT_BASEURL = 'http://192.168.1.131:3005'
#-----------------------
# Core
@ -380,13 +383,18 @@ def test_play_queues(plex, user=None):
@register('client')
def test_list_clients(plex, user=None):
clients = [c.name or c.product for c in plex.clients()]
log(2, ', '.join(clients))
clients = [c.title for c in plex.clients()]
log(2, 'Clients: %s' % ', '.join(clients))
assert clients, 'Server is not listing any clients.'
@register('client')
def test_client_navigation(plex, user=None):
client = plex.client(PLEX_CLIENT)
try:
client = plex.client(PLEX_CLIENT)
except NotFound as err:
log(2, 'Warning: %s' % err)
client = PlexClient(PLEX_CLIENT_BASEURL, server=plex)
_navigate(plex, client)
@ -400,7 +408,7 @@ def test_client_navigation_via_proxy(plex, user=None):
def _navigate(plex, client):
episode = plex.library.section(SHOW_SECTION).get(SHOW_TITLE).get(SHOW_EPISODE)
artist = plex.library.section(AUDIO_SECTION).get(AUDIO_ARTIST)
log(2, 'Client: %s (%s)' % (client.name, client.product))
log(2, 'Client: %s (%s)' % (client.title, client.product))
log(2, 'Capabilities: %s' % client.protocolCapabilities)
# Move around a bit
log(2, 'Browsing around..')
@ -446,7 +454,7 @@ def _video_playback(plex, client):
mtype = 'video'
movie = plex.library.section(MOVIE_SECTION).get(MOVIE_TITLE)
subs = [s for s in movie.subtitleStreams if s.language == 'English']
log(2, 'Client: %s (%s)' % (client.name, client.product))
log(2, 'Client: %s (%s)' % (client.title, client.product))
log(2, 'Capabilities: %s' % client.protocolCapabilities)
log(2, 'Playing to %s..' % movie.title)
client.playMedia(movie); time.sleep(5)
@ -502,28 +510,53 @@ def test_sync_items(plex, user=None):
#-----------------------
# Resource
# MyPlex Resources
#-----------------------
@register('resource')
def test_list_resources(plex, user=None):
@register('myplex,resource')
def test_myplex_resources(plex, user=None):
assert user, 'Must specify username, password & resource to run this test.'
resources = [r.name or r.product for r in user.resources()]
log(2, ', '.join(resources))
resources = user.resources()
for resource in resources:
name = resource.name or 'Unknown'
connections = [c.uri for c in resource.connections]
connections = ', '.join(connections) if connections else 'None'
log(2, '%s (%s): %s' % (name, resource.product, connections))
assert resources, 'No resources found for user: %s' % user.name
@register('myplex,devices')
def test_myplex_devices(plex, user=None):
assert user, 'Must specify username, password & resource to run this test.'
devices = user.devices()
for device in devices:
name = device.name or 'Unknown'
connections = ', '.join(device.connections) if device.connections else 'None'
log(2, '%s (%s): %s' % (name, device.product, connections))
assert devices, 'No devices found for user: %s' % user.name
@register('myplex,devices')
def test_myplex_connect_to_device(plex, user=None):
assert user, 'Must specify username, password & resource to run this test.'
device = user.device(PLEX_CLIENT)
client = device.connect()
log(2, 'Connected to client: %s (%s)' % (client.title, client.product))
assert client, 'Unable to connect to device'
if __name__ == '__main__':
# There are three ways to authenticate:
# 1. If the server is running on localhost, just run without any auth.
# 2. Pass in --username, --password, and --resource.
# 3. Pass in --baseuri, --token
# 3. Pass in --baseurl, --token
parser = argparse.ArgumentParser(description='Run PlexAPI tests.')
parser.add_argument('--username', help='Username for the Plex server.')
parser.add_argument('--password', help='Password for the Plex server.')
parser.add_argument('--resource', help='Name of the Plex resource (requires user/pass).')
parser.add_argument('--baseuri', help='Baseuri needed for auth token authentication')
parser.add_argument('--token', help='Auth token (instead of user/pass)')
parser.add_argument('--query', help='Only run the specified tests.')
parser.add_argument('-u', '--username', help='Username for the Plex server.')
parser.add_argument('-p', '--password', help='Password for the Plex server.')
parser.add_argument('-r', '--resource', help='Name of the Plex resource (requires user/pass).')
parser.add_argument('-b', '--baseurl', help='Baseurl needed for auth token authentication')
parser.add_argument('-t', '--token', help='Auth token (instead of user/pass)')
parser.add_argument('-q', '--query', help='Only run the specified tests.')
parser.add_argument('-v', '--verbose', default=False, action='store_true', help='Print verbose logging.')
args = parser.parse_args()
run_tests(__name__, args)

View file

@ -29,10 +29,13 @@ def log(indent, message, color=None):
def fetch_server(args):
if args.resource and args.username and args.password:
log(0, 'Signing in as MyPlex user %s..' % args.username)
user = MyPlexUser.signin(args.username, args.password)
return user.getResource(args.resource).connect(), user
elif args.baseuri and args.token:
return server.PlexServer(args.baseuri, args.token), None
log(0, 'Connecting to Plex server %s..' % args.resource)
return user.resource(args.resource).connect(), user
elif args.baseurl and args.token:
log(0, 'Connecting to Plex server %s..' % args.baseurl)
return server.PlexServer(args.baseurl, args.token), None
return server.PlexServer(), None
@ -52,14 +55,12 @@ def run_tests(module, args):
plex, user = fetch_server(args)
tests = {'passed':0, 'failed':0}
for test in iter_tests(args.query):
startqueries = server.TOTAL_QUERIES
starttime = time.time()
log(0, '%s (%s)' % (test['name'], ','.join(test['tags'])))
try:
test['func'](plex, user)
runtime = time.time() - starttime
queries = server.TOTAL_QUERIES - startqueries
log(2, 'PASS! (runtime: %.3fs; queries: %s)' % (runtime, queries), 'blue')
log(2, 'PASS! (runtime: %.3fs)' % runtime, 'blue')
tests['passed'] += 1
except Exception as err:
errstr = str(err)