Move from Bitbucket

This commit is contained in:
Michael Shepanski 2014-12-28 22:21:58 -05:00
commit 352696147e
22 changed files with 1751 additions and 0 deletions

10
.gitignore vendored Normal file
View file

@ -0,0 +1,10 @@
syntax: glob
*.db
*.log
*.pyc
*.sublime-*
*__pycache__*
dist
build
*.egg-info
.idea/

6
AUTHORS.txt Normal file
View file

@ -0,0 +1,6 @@
Primary Authors:
* Michael Shepanski
Thanks to Contributors:
* Nate Mara (Timeline)
* Goni Zahavy (Sync, Media Parts)

25
LICENSE.txt Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2010, Michael Shepanski
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name conky-pkmeter nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

2
MANIFEST.in Normal file
View file

@ -0,0 +1,2 @@
include README.md
include requirements.pip

83
README.md Normal file
View file

@ -0,0 +1,83 @@
## PlexAPI ##
Python bindings for the Plex API.
* Navigate local or remote shared libraries.
* Mark shows watched or unwatched.
* Request rescan, analyze, empty trash.
* Play media on connected clients.
* Plex Sync Support.
Planned features:
* Create and maintain playlists.
* List active sessions.
* Play trailers and extras.
* Provide useful utility scripts.
* Better support for Music and Photos?
#### Install ###
pip install plexapi
#### Getting a PlexServer Instance ####
There are two types of authentication. If running the PlexAPI on the same
network as the Plex Server (and you are not using Plex Users), you can
authenticate without a username and password. Getting a PlexServer
instance is as easy as the following:
from plexapi.server import PlexServer
plex = PlexServer() # Defaults to localhost:32400
If you are running on a separate network or using Plex Users you need to log
into MyPlex to get a PlexServer instance. An example of this is below. NOTE:
Servername below is the name of the server (not the hostname and port). If
logged into Plex Web you can see the server name in the top left above your
available libraries.
from plexapi.myplex import MyPlexUser
user = MyPlexUser('<USERNAME>', '<PASSWORD>')
plex = user.getServer('<SERVERNAME>').connect()
#### Usage Examples ####
# Example 1: List all unwatched content in library.
for section in plex.library.sections():
print 'Unwatched content in %s:' % section.title
for video in section.unwatched():
print ' %s' % video.title
# Example 2: Mark all Conan episodes watched.
plex.library.get('Conan (2010)').markWatched()
# Example 3: List all Clients connected to the Server.
for client in plex.clients():
print client.name
# Example 4: Play the Movie Avatar on my iPhone.
avatar = plex.library.section('Movies').get('Avatar')
client = plex.client("Michael's iPhone")
client.playMedia(avatar)
# Example 5: List all content with the word 'Game' in the title.
for video in plex.search('Game'):
print '%s (%s)' % (video.title, video.TYPE)
# Example 6: List all movies directed by the same person as Jurassic Park.
jurassic_park = plex.library.section('Movies').get('Jurassic Park')
director = jurassic_park.directors[0]
for movie in director.related():
print movie.title
# Example 7: List files for the latest episode of Friends.
the_last_one = plex.library.get('Friends').episodes()[-1]
for part in the_last_one.iter_parts():
print part.file
#### FAQs ####
**Q. Why are you using camelCase and not following PEP8 guidelines?**
A. This API reads XML documents provided by MyPlex and the Plex Server.
We decided to conform to their style so that the API variable names directly
match with the provided XML documents.

0
examples/__init__.py Normal file
View file

73
examples/examples.py Normal file
View file

@ -0,0 +1,73 @@
"""
PlexAPI Examples
As of Plex version 0.9.11 I noticed that you must be logged in
to browse even the plex server locatewd at localhost. You can
run this example suite with the following command:
>> python examples.py -u <USERNAME> -p <PASSWORD> -s <SERVERNAME>
"""
import argparse, sys
from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__))))
from utils import fetch_server, iter_tests
def example_001_list_all_unwatched_content(plex):
""" Example 1: List all unwatched content in library """
for section in plex.library.sections():
print 'Unwatched content in %s:' % section.title
for video in section.unwatched():
print ' %s' % video.title
def example_002_mark_all_conan_episodes_watched(plex):
""" Example 2: Mark all Conan episodes watched. """
plex.library.get('Conan (2010)').markWatched()
def example_003_list_all_clients(plex):
""" Example 3: List all Clients connected to the Server. """
for client in plex.clients():
print client.name
def example_004_play_avatar_on_iphone(plex):
""" Example 4: Play the Movie Avatar on my iPhone. """
avatar = plex.library.section('Movies').get('Avatar')
client = plex.client("Michael's iPhone")
client.playMedia(avatar)
def example_005_search(plex):
""" Example 5: List all content with the word 'Game' in the title. """
for video in plex.search('Game'):
print '%s (%s)' % (video.title, video.TYPE)
def example_006_follow_the_talent(plex):
""" Example 6: List all movies directed by the same person as Jurassic Park. """
jurassic_park = plex.library.section('Movies').get('Jurassic Park')
director = jurassic_park.directors[0]
for movie in director.related():
print movie.title
def example_007_list_files(plex):
""" Example 7: List files for the latest episode of Friends. """
the_last_one = plex.library.get('Friends').episodes()[-1]
for part in the_last_one.iter_parts():
print part.file
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run PlexAPI examples.')
parser.add_argument('-s', '--server', help='Name of the Plex server (requires user/pass).')
parser.add_argument('-u', '--username', help='Username for the Plex server.')
parser.add_argument('-p', '--password', help='Password for the Plex server.')
parser.add_argument('-n', '--name', help='Only run tests containing this string. Leave blank to run all examples.')
args = parser.parse_args()
plex = fetch_server(args)
for example in iter_tests(__name__, args):
example(plex)

213
examples/tests.py Normal file
View file

@ -0,0 +1,213 @@
"""
Test Library Functions
As of Plex version 0.9.11 I noticed that you must be logged in
to browse even the plex server locatewd at localhost. You can
run this test suite with the following command:
>> python tests.py -u <USERNAME> -p <PASSWORD> -s <SERVERNAME>
"""
import argparse, sys, time
from os.path import dirname, abspath
sys.path.append(dirname(dirname(abspath(__file__))))
from utils import log, run_tests
SHOW_SECTION = 'TV Shows'
SHOW_TITLE = 'Game of Thrones'
SHOW_SEASON = 'Season 1'
SHOW_EPISODE = 'Winter Is Coming'
MOVIE_SECTION = 'Movies'
MOVIE_TITLE = 'Jurassic Park'
PLEX_CLIENT = "Michael's iPhone"
def test_001_server(plex):
log(2, 'Username: %s' % plex.myPlexUsername)
log(2, 'Platform: %s' % plex.platform)
log(2, 'Version: %s' % plex.version)
assert plex.myPlexUsername is not None, 'Unknown username.'
assert plex.platform is not None, 'Unknown platform.'
assert plex.version is not None, 'Unknown version.'
def test_002_list_sections(plex):
sections = [s.title for s in plex.library.sections()]
log(2, 'Sections: %s' % sections)
assert SHOW_SECTION in sections, '%s not a library section.' % SHOW_SECTION
assert MOVIE_SECTION in sections, '%s not a library section.' % MOVIE_SECTION
plex.library.section(SHOW_SECTION)
plex.library.section(MOVIE_SECTION)
def test_003_search_show(plex):
result_server = plex.search(SHOW_TITLE)
result_library = plex.library.search(SHOW_TITLE)
result_shows = plex.library.section(SHOW_SECTION).search(SHOW_TITLE)
result_movies = plex.library.section(MOVIE_SECTION).search(SHOW_TITLE)
log(2, 'Searching for: %s' % SHOW_TITLE)
log(4, 'Result Server: %s' % result_server)
log(4, 'Result Library: %s' % result_library)
log(4, 'Result Shows: %s' % result_shows)
log(4, 'Result Movies: %s' % result_movies)
assert result_server, 'Show not found.'
assert result_server == result_library == result_shows, 'Show searches not consistent.'
assert not result_movies, 'Movie search returned show title.'
def test_004_search_movie(plex):
result_server = plex.search(MOVIE_TITLE)
result_library = plex.library.search(MOVIE_TITLE)
result_shows = plex.library.section(SHOW_SECTION).search(MOVIE_TITLE)
result_movies = plex.library.section(MOVIE_SECTION).search(MOVIE_TITLE)
log(2, 'Searching for: %s' % MOVIE_TITLE)
log(4, 'Result Server: %s' % result_server)
log(4, 'Result Library: %s' % result_library)
log(4, 'Result Shows: %s' % result_shows)
log(4, 'Result Movies: %s' % result_movies)
assert result_server, 'Movie not found.'
assert result_server == result_library == result_movies, 'Movie searches not consistent.'
assert not result_shows, 'Show search returned show title.'
def test_005_navigate_to_show(plex):
result_library = plex.library.get(SHOW_TITLE)
result_shows = plex.library.section(SHOW_SECTION).get(SHOW_TITLE)
try:
result_movies = plex.library.section(MOVIE_SECTION).get(SHOW_TITLE)
except:
result_movies = None
log(2, 'Navigating to: %s' % SHOW_TITLE)
log(4, 'Result Library: %s' % result_library)
log(4, 'Result Shows: %s' % result_shows)
log(4, 'Result Movies: %s' % result_movies)
assert result_library == result_shows, 'Show navigation not consistent.'
assert not result_movies, 'Movie navigation returned show title.'
def test_006_navigate_to_movie(plex):
result_library = plex.library.get(MOVIE_TITLE)
result_movies = plex.library.section(MOVIE_SECTION).get(MOVIE_TITLE)
try:
result_shows = plex.library.section(SHOW_SECTION).get(MOVIE_TITLE)
except:
result_shows = None
log(2, 'Navigating to: %s' % MOVIE_TITLE)
log(4, 'Result Library: %s' % result_library)
log(4, 'Result Shows: %s' % result_shows)
log(4, 'Result Movies: %s' % result_movies)
assert result_library == result_movies, 'Movie navigation not consistent.'
assert not result_shows, 'Show navigation returned show title.'
def test_007_navigate_around_show(plex):
show = plex.library.get(SHOW_TITLE)
seasons = show.seasons()
season = show.season(SHOW_SEASON)
episodes = show.episodes()
episode = show.episode(SHOW_EPISODE)
log(2, 'Navigating around show: %s' % show)
log(4, 'Seasons: %s...' % seasons[:3])
log(4, 'Season: %s' % season)
log(4, 'Episodes: %s...' % episodes[:3])
log(4, 'Episode: %s' % episode)
assert SHOW_SEASON in [s.title for s in seasons], 'Unable to get season: %s' % SHOW_SEASON
assert SHOW_EPISODE in [e.title for e in episodes], 'Unable to get episode: %s' % SHOW_EPISODE
assert season.show() == show, 'season.show() doesnt match expected show.'
assert episode.show() == show, 'episode.show() doesnt match expected show.'
assert episode.season() == season, 'episode.season() doesnt match expected season.'
def test_008_mark_movie_watched(plex):
movie = plex.library.section(MOVIE_SECTION).get(MOVIE_TITLE)
movie.markUnwatched()
log(2, 'Marking movie watched: %s' % movie)
log(2, 'View count: %s' % movie.viewCount)
movie.markWatched()
log(2, 'View count: %s' % movie.viewCount)
assert movie.viewCount == 1, 'View count 0 after watched.'
movie.markUnwatched()
log(2, 'View count: %s' % movie.viewCount)
assert movie.viewCount == 0, 'View count 1 after unwatched.'
def test_009_refresh(plex):
shows = plex.library.section(MOVIE_SECTION)
shows.refresh()
def test_010_playQueues(plex):
episode = plex.library.get(SHOW_TITLE).get(SHOW_EPISODE)
playqueue = plex.createPlayQueue(episode)
assert len(playqueue.items) == 1, 'No items in play queue.'
assert playqueue.items[0].title == SHOW_EPISODE, 'Wrong show queued.'
assert playqueue.playQueueID, 'Play queue ID not set.'
def test_011_play_media(plex):
# Make sure the client is turned on!
episode = plex.library.get(SHOW_TITLE).get(SHOW_EPISODE)
client = plex.client(PLEX_CLIENT)
client.playMedia(episode); time.sleep(10)
client.pause(); time.sleep(3)
client.stepForward(); time.sleep(3)
client.play(); time.sleep(3)
client.stop(); time.sleep(3)
movie = plex.library.get(MOVIE_TITLE)
movie.play(client); time.sleep(10)
client.stop()
def test_012_myplex_account(plex):
account = plex.account()
print account.__dict__
def test_013_list_media_files(plex):
# Fetch file names from the tv show
episode_files = []
episode = plex.library.get(SHOW_TITLE).episodes()[-1]
log(2, 'Episode Files: %s' % episode)
for media in episode.media:
for part in media.parts:
log(4, part.file)
episode_files.append(part.file)
assert filter(None, episode_files), 'No show files have been listed.'
# Fetch file names from the movie
movie_files = []
movie = plex.library.get(MOVIE_TITLE)
log(2, 'Movie Files: %s' % movie)
for media in movie.media:
for part in media.parts:
log(4, part.file)
movie_files.append(part.file)
assert filter(None, movie_files), 'No movie files have been listed.'
def test_014_list_video_tags(plex):
movie = plex.library.get(MOVIE_TITLE)
log(2, 'Countries: %s' % movie.countries[0:3])
log(2, 'Directors: %s' % movie.directors[0:3])
log(2, 'Genres: %s' % movie.genres[0:3])
log(2, 'Producers: %s' % movie.producers[0:3])
log(2, 'Actors: %s' % movie.actors[0:3])
log(2, 'Writers: %s' % movie.writers[0:3])
assert filter(None, movie.countries), 'No countries listed for movie.'
assert filter(None, movie.directors), 'No directors listed for movie.'
assert filter(None, movie.genres), 'No genres listed for movie.'
assert filter(None, movie.producers), 'No producers listed for movie.'
assert filter(None, movie.actors), 'No actors listed for movie.'
assert filter(None, movie.writers), 'No writers listed for movie.'
log(2, 'List movies with same director: %s' % movie.directors[0])
related = movie.directors[0].related()
log(4, related[0:3])
assert movie in related, 'Movie was not found in related directors search.'
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Run PlexAPI tests.')
parser.add_argument('-s', '--server', help='Name of the Plex server (requires user/pass).')
parser.add_argument('-u', '--username', help='Username for the Plex server.')
parser.add_argument('-p', '--password', help='Password for the Plex server.')
parser.add_argument('-n', '--name', help='Only run tests containing this string. Leave blank to run all tests.')
args = parser.parse_args()
run_tests(__name__, args)

54
examples/utils.py Normal file
View file

@ -0,0 +1,54 @@
"""
Test Library Functions
"""
import inspect, sys
import datetime, time
from plexapi import server
from plexapi.myplex import MyPlexUser
def log(indent, message):
dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
print('%s: %s%s' % (dt, ' '*indent, message))
def fetch_server(args):
if args.server:
user = MyPlexUser(args.username, args.password)
return user.getServer(args.server).connect()
return server.PlexServer()
def iter_tests(module, args):
module = sys.modules[module]
for func in sorted(module.__dict__.values()):
if inspect.isfunction(func) and inspect.getmodule(func) == module:
name = func.__name__
if name.startswith('test_') or name.startswith('example_') and (not args.name or args.name in name):
yield func
def run_tests(module, args):
plex = fetch_server(args)
tests = {'passed':0, 'failed':0}
for test in iter_tests(module, args):
startqueries = server.TOTAL_QUERIES
starttime = time.time()
log(0, test.__name__)
try:
test(plex)
tests['passed'] += 1
except Exception, err:
log(2, 'FAIL!: %s' % err)
tests['failed'] += 1
runtime = time.time() - starttime
log(2, 'Runtime: %.3fs' % runtime)
log(2, 'Queries: %s' % (server.TOTAL_QUERIES - startqueries))
log(0, '')
log(0, 'Tests Run: %s' % sum(tests.values()))
log(0, 'Tests Passed: %s' % tests['passed'])
log(0, 'Tests Failed: %s' % tests['failed'])
if not tests['failed']:
log(0, '')
log(0, 'EVERYTHING OK!! :)')
raise SystemExit(tests['failed'])

37
plexapi/__init__.py Normal file
View file

@ -0,0 +1,37 @@
"""
PlexAPI
"""
import logging, os, platform
from logging.handlers import RotatingFileHandler
from uuid import getnode
PROJECT = 'PlexAPI'
VERSION = '0.9.4'
TIMEOUT = 5
# Plex Header Configuation
X_PLEX_PLATFORM = platform.uname()[0] # Platform name, eg iOS, MacOSX, Android, LG, etc
X_PLEX_PLATFORM_VERSION = platform.uname()[2] # Operating system version, eg 4.3.1, 10.6.7, 3.2
X_PLEX_PROVIDES = 'controller' # one or more of [player, controller, server]
X_PLEX_PRODUCT = PROJECT # Plex application name, eg Laika, Plex Media Server, Media Link
X_PLEX_VERSION = VERSION # Plex application version number
X_PLEX_DEVICE = platform.platform() # Device name and model number, eg iPhone3,2, Motorola XOOM, LG5200TV
X_PLEX_IDENTIFIER = str(hex(getnode())) # UUID, serial number, or other number unique per device
BASE_HEADERS = {
'X-Plex-Platform': X_PLEX_PLATFORM,
'X-Plex-Platform-Version': X_PLEX_PLATFORM_VERSION,
'X-Plex-Provides': X_PLEX_PROVIDES,
'X-Plex-Product': X_PLEX_PRODUCT,
'X-Plex-Version': X_PLEX_VERSION,
'X-Plex-Device': X_PLEX_DEVICE,
'X-Plex-Client-Identifier': X_PLEX_IDENTIFIER,
}
# Logging Configuration
log = logging.getLogger('plexapi')
logfile = os.path.join('/tmp', 'plexapi.log')
logformat = logging.Formatter('%(asctime)s %(module)-12s %(levelname)-6s %(message)s')
filehandler = RotatingFileHandler(logfile, 'a', 512000, 3)
filehandler.setFormatter(logformat)
log.addHandler(filehandler)
log.setLevel(logging.INFO)

125
plexapi/client.py Normal file
View file

@ -0,0 +1,125 @@
"""
PlexAPI Client
See: https://code.google.com/p/plex-api/w/list
"""
import requests
from requests.status_codes import _codes as codes
from plexapi import TIMEOUT, log, utils, BASE_HEADERS
from plexapi.exceptions import BadRequest
from xml.etree import ElementTree
SERVER = 'server'
CLIENT = 'client'
class Client(object):
def __init__(self, server, data):
self.server = server
self.name = data.attrib.get('name')
self.host = data.attrib.get('host')
self.address = data.attrib.get('address')
self.port = data.attrib.get('port')
self.machineIdentifier = data.attrib.get('machineIdentifier')
self.version = data.attrib.get('version')
self.protocol = data.attrib.get('protocol')
self.product = data.attrib.get('product')
self.deviceClass = data.attrib.get('deviceClass')
self.protocolVersion = data.attrib.get('protocolVersion')
self.protocolCapabilities = data.attrib.get('protocolCapabilities', '').split(',')
self._sendCommandsTo = SERVER
def sendCommandsTo(self, value):
self._sendCommandsTo = value
def sendCommand(self, command, args=None, sendTo=None):
sendTo = sendTo or self._sendCommandsTo
if sendTo == CLIENT:
return self.sendClientCommand(command, args)
return self.sendServerCommand(command, args)
def sendClientCommand(self, command, args=None):
url = '%s%s' % (self.url(command), utils.joinArgs(args))
log.info('GET %s', url)
response = requests.get(url, timeout=TIMEOUT)
if response.status_code != requests.codes.ok:
codename = codes.get(response.status_code)[0]
raise BadRequest('(%s) %s' % (response.status_code, codename))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data else None
def sendServerCommand(self, command, args=None):
path = '/system/players/%s/%s%s' % (self.address, command, utils.joinArgs(args))
self.server.query(path)
def url(self, path):
return 'http://%s:%s/player/%s' % (self.address, self.port, path.lstrip('/'))
# Navigation Commands
def moveUp(self): self.sendCommand('navigation/moveUp')
def moveDown(self): self.sendCommand('navigation/moveDown')
def moveLeft(self): self.sendCommand('navigation/moveLeft')
def moveRight(self): self.sendCommand('navigation/moveRight')
def pageUp(self): self.sendCommand('navigation/pageUp')
def pageDown(self): self.sendCommand('navigation/pageDown')
def nextLetter(self): self.sendCommand('navigation/nextLetter')
def previousLetter(self): self.sendCommand('navigation/previousLetter')
def select(self): self.sendCommand('navigation/select')
def back(self): self.sendCommand('navigation/back')
def contextMenu(self): self.sendCommand('navigation/contextMenu')
def toggleOSD(self): self.sendCommand('navigation/toggleOSD')
# Playback Commands
def play(self): self.sendCommand('playback/play')
def pause(self): self.sendCommand('playback/pause')
def stop(self): self.sendCommand('playback/stop')
def stepForward(self): self.sendCommand('playback/stepForward')
def bigStepForward(self): self.sendCommand('playback/bigStepForward')
def stepBack(self): self.sendCommand('playback/stepBack')
def bigStepBack(self): self.sendCommand('playback/bigStepBack')
def skipNext(self): self.sendCommand('playback/skipNext')
def skipPrevious(self): self.sendCommand('playback/skipPrevious')
def playMedia(self, video, viewOffset=0):
playqueue = self.server.createPlayQueue(video)
self.sendCommand('playback/playMedia', {
'machineIdentifier': self.server.machineIdentifier,
'containerKey': '/playQueues/%s?window=100&own=1' % playqueue.playQueueID,
'key': video.key,
'offset': int(viewOffset),
})
def timeline(self):
"""
Returns an XML ElementTree object corresponding to the timeline for
this client. Holds the information about what media is playing on this
client.
"""
url = self.url('timeline/poll')
params = {
'wait': 1,
'commandID': 4,
}
xml_text = requests.get(url, params=params, headers=BASE_HEADERS).text
return ElementTree.fromstring(xml_text)
def isPlayingMedia(self):
"""
Returns True if any of the media types for this client have the status
of "playing", False otherwise. Also returns True if media is paused.
"""
timeline = self.timeline()
for media_type in timeline:
if media_type.get('state') == 'playing':
return True
return False
# def rewind(self): self.sendCommand('playback/rewind')
# def fastForward(self): self.sendCommand('playback/fastForward')
# def playFile(self): pass
# def screenshot(self): pass
# def sendString(self): pass
# def sendKey(self): pass
# def sendVirtualKey(self): pass

15
plexapi/exceptions.py Normal file
View file

@ -0,0 +1,15 @@
"""
PlexAPI Exceptions
"""
class BadRequest(Exception):
pass
class NotFound(Exception):
pass
class UnknownType(Exception):
pass
class Unsupported(Exception):
pass

200
plexapi/library.py Normal file
View file

@ -0,0 +1,200 @@
"""
PlexLibrary
"""
from plexapi import video, utils
from plexapi.exceptions import NotFound
class Library(object):
def __init__(self, server, data):
self.server = server
self.identifier = data.attrib.get('identifier')
self.mediaTagVersion = data.attrib.get('mediaTagVersion')
self.title1 = data.attrib.get('title1')
self.title2 = data.attrib.get('title2')
def __repr__(self):
return '<Library:%s>' % self.title1.encode('utf8')
def sections(self):
items = []
SECTION_TYPES = {MovieSection.TYPE:MovieSection, ShowSection.TYPE:ShowSection}
path = '/library/sections'
for elem in self.server.query(path):
stype = elem.attrib['type']
if stype in SECTION_TYPES:
cls = SECTION_TYPES[stype]
items.append(cls(self.server, elem, path))
return items
def section(self, title=None):
for item in self.sections():
if item.title == title:
return item
raise NotFound('Invalid library section: %s' % title)
def all(self):
return video.list_items(self.server, '/library/all')
def onDeck(self):
return video.list_items(self.server, '/library/onDeck')
def recentlyAdded(self):
return video.list_items(self.server, '/library/recentlyAdded')
def get(self, title):
return video.find_item(self.server, '/library/all', title)
def search(self, title, filter='all', vtype=None, **tags):
""" Search all available content.
title: Title to search (pass None to search all titles).
filter: One of {'all', 'onDeck', 'recentlyAdded'}.
videotype: One of {'movie', 'show', 'season', 'episode'}.
tags: One of {country, director, genre, producer, actor, writer}.
"""
args = {}
if title: args['title'] = title
if vtype: args['type'] = video.search_type(vtype)
for tag, obj in tags.iteritems():
args[tag] = obj.id
query = '/library/%s%s' % (filter, utils.joinArgs(args))
return video.list_items(self.server, query)
def cleanBundles(self):
self.server.query('/library/clean/bundles')
def emptyTrash(self):
for section in self.sections():
section.emptyTrash()
def optimize(self):
self.server.query('/library/optimize')
def refresh(self):
self.server.query('/library/sections/all/refresh')
class LibrarySection(object):
def __init__(self, server, data, initpath):
self.server = server
self.initpath = initpath
self.type = data.attrib.get('type')
self.key = data.attrib.get('key')
self.title = data.attrib.get('title')
self.scanner = data.attrib.get('scanner')
self.language = data.attrib.get('language')
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
def _primaryList(self, key):
return video.list_items(self.server, '/library/sections/%s/%s' % (self.key, key))
def _secondaryList(self, key, input=None):
choices = list_choices(self.server, '/library/sections/%s/%s' % (self.key, key))
if not input:
return choices.keys()
return video.list_items(self.server, '/library/sections/%s/%s/%s' % (self.key, key, choices[input]))
def all(self):
return self._primaryList('all')
def newest(self):
return self._primaryList('newest')
def onDeck(self):
return self._primaryList('onDeck')
def recentlyAdded(self):
return self._primaryList('recentlyAdded')
def recentlyViewed(self):
return self._primaryList('recentlyViewed')
def unwatched(self):
return self._primaryList('unwatched')
def contentRating(self, input=None):
return self._secondaryList('contentRating', input)
def firstCharacter(self, input=None):
return self._secondaryList('firstCharacter', input)
def genre(self, input=None):
return self._secondaryList('genre', input)
def year(self, input=None):
return self._secondaryList('year', input)
def get(self, title):
path = '/library/sections/%s/all' % self.key
return video.find_item(self.server, path, title)
def search(self, title, filter='all', vtype=None, **tags):
""" Search section content.
title: Title to search (pass None to search all titles).
filter: One of {'all', 'newest', 'onDeck', 'recentlyAdded', 'recentlyViewed', 'unwatched'}.
videotype: One of {'movie', 'show', 'season', 'episode'}.
tags: One of {country, director, genre, producer, actor, writer}.
"""
args = {}
if title: args['title'] = title
if vtype: args['type'] = video.search_type(vtype)
for tag, obj in tags.iteritems():
args[tag] = obj.id
query = '/library/sections/%s/%s%s' % (self.key, filter, utils.joinArgs(args))
return video.list_items(self.server, query)
def analyze(self):
self.server.query('/library/sections/%s/analyze' % self.key)
def emptyTrash(self):
self.server.query('/library/sections/%s/emptyTrash' % self.key)
def refresh(self):
self.server.query('/library/sections/%s/refresh' % self.key)
class MovieSection(LibrarySection):
TYPE = 'movie'
def actor(self, input=None):
return self._secondaryList('actor', input)
def country(self, input=None):
return self._secondaryList('country', input)
def decade(self, input=None):
return self._secondaryList('decade', input)
def director(self, input=None):
return self._secondaryList('director', input)
def rating(self, input=None):
return self._secondaryList('rating', input)
def resolution(self, input=None):
return self._secondaryList('resolution', input)
def search(self, title, filter='all', **tags):
return super(MovieSection, self).search(title, filter=filter, vtype=video.Movie.TYPE, **tags)
class ShowSection(LibrarySection):
TYPE = 'show'
def recentlyViewedShows(self):
return self._primaryList('recentlyViewedShows')
def search(self, title, filter='all', **tags):
return super(ShowSection, self).search(title, filter=filter, vtype=video.Show.TYPE, **tags)
def searchEpisodes(self, title, filter='all', **tags):
return super(ShowSection, self).search(title, filter=filter, vtype=video.Episode.TYPE, **tags)
def list_choices(server, path):
return {c.attrib['title']:c.attrib['key'] for c in server.query(path)}

174
plexapi/media.py Normal file
View file

@ -0,0 +1,174 @@
"""
PlexAPI Media
"""
from plexapi.utils import cast
class Media(object):
TYPE = 'Media'
def __init__(self, server, data, initpath, video):
self.server = server
self.initpath = initpath
self.video = video
self.videoResolution = data.attrib.get('videoResolution')
self.id = cast(int, data.attrib.get('id'))
self.duration = cast(int, data.attrib.get('duration'))
self.bitrate = cast(int, data.attrib.get('bitrate'))
self.width = cast(int, data.attrib.get('width'))
self.height = cast(int, data.attrib.get('height'))
self.aspectRatio = cast(float, data.attrib.get('aspectRatio'))
self.audioChannels = cast(int, data.attrib.get('audioChannels'))
self.audioCodec = data.attrib.get('audioCodec')
self.videoCodec = data.attrib.get('videoCodec')
self.container = data.attrib.get('container')
self.videoFrameRate = data.attrib.get('videoFrameRate')
self.optimizedForStreaming = cast(bool, data.attrib.get('optimizedForStreaming'))
self.optimizedForStreaming = cast(bool, data.attrib.get('has64bitOffsets'))
self.parts = [MediaPart(server, elem, initpath, self) for elem in data]
def __repr__(self):
title = self.video.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
class MediaPart(object):
TYPE = 'Part'
def __init__(self, server, data, initpath, media):
self.server = server
self.initpath = initpath
self.media = media
self.id = cast(int, data.attrib.get('id'))
self.key = data.attrib.get('key')
self.duration = cast(int, data.attrib.get('duration'))
self.file = data.attrib.get('file')
self.size = cast(int, data.attrib.get('size'))
self.container = data.attrib.get('container')
self.syncId = cast(int, data.attrib.get('syncId', '-1'))
self.syncItemId = cast(int, data.attrib.get('syncItemId', '-1'))
self.transcodeState = data.attrib.get('transcodeState', '')
self.optimizedForStreaming = cast(bool, data.attrib.get('optimizedForStreaming', '0'))
self.streams = [
MediaPartStream.parse(self.server, elem, self.initpath, self)
for elem in data if elem.tag == MediaPartStream.TYPE
]
def __repr__(self):
return '<%s:%s>' % (self.__class__.__name__, self.id)
def selectedStream(self, stream_type):
streams = filter(lambda x: stream_type == x.type, self.streams)
selected = filter(lambda x: x.selected is True, streams)
if len(selected) == 0:
return None
return selected[0]
class MediaPartStream(object):
TYPE = 'Stream'
def __init__(self, server, data, initpath, part):
self.server = server
self.initpath = initpath
self.part = part
self.id = cast(int, data.attrib.get('id'))
self.type = cast(int, data.attrib.get('streamType'))
self.codec = data.attrib.get('codec')
self.selected = cast(bool, data.attrib.get('selected', '0'))
self.index = cast(int, data.attrib.get('index', '-1'))
@staticmethod
def parse(server, data, initpath, part):
STREAMCLS = {
StreamVideo.TYPE: StreamVideo,
StreamAudio.TYPE: StreamAudio,
StreamSubtitle.TYPE: StreamSubtitle
}
stype = cast(int, data.attrib.get('streamType'))
cls = STREAMCLS.get(stype, MediaPartStream)
# return generic MediaPartStream if type is unknown
return cls(server, data, initpath, part)
def __repr__(self):
return '<%s:%s>' % (self.__class__.__name__, self.id)
class StreamVideo(MediaPartStream):
TYPE = 1
def __init__(self, server, data, initpath, part):
super(StreamVideo, self).__init__(server, data, initpath, part)
self.bitrate = cast(int, data.attrib.get('bitrate'))
self.language = data.attrib.get('langauge')
self.languageCode = data.attrib.get('languageCode')
self.bitDepth = cast(int, data.attrib.get('bitDepth'))
self.cabac = cast(int, data.attrib.get('cabac'))
self.chromaSubsampling = data.attrib.get('chromaSubsampling')
self.codecID = data.attrib.get('codecID')
self.colorSpace = data.attrib.get('colorSpace')
self.duration = cast(int, data.attrib.get('duration'))
self.frameRate = cast(float, data.attrib.get('frameRate'))
self.frameRateMode = data.attrib.get('frameRateMode')
self.hasScallingMatrix = cast(bool, data.attrib.get('hasScallingMatrix'))
self.height = cast(int, data.attrib.get('height'))
self.level = cast(int, data.attrib.get('level'))
self.profile = data.attrib.get('profile')
self.refFrames = cast(int, data.attrib.get('refFrames'))
self.scanType = data.attrib.get('scanType')
self.title = data.attrib.get('title')
self.width = cast(int, data.attrib.get('width'))
class StreamAudio(MediaPartStream):
TYPE = 2
def __init__(self, server, data, initpath, part):
super(StreamAudio, self).__init__(server, data, initpath, part)
self.channels = cast(int, data.attrib.get('channels'))
self.bitrate = cast(int, data.attrib.get('bitrate'))
self.bitDepth = cast(int, data.attrib.get('bitDepth'))
self.bitrateMode = data.attrib.get('bitrateMode')
self.codecID = data.attrib.get('codecID')
self.dialogNorm = cast(int, data.attrib.get('dialogNorm'))
self.duration = cast(int, data.attrib.get('duration'))
self.samplingRate = cast(int, data.attrib.get('samplingRate'))
self.title = data.attrib.get('title')
class StreamSubtitle(MediaPartStream):
TYPE = 3
def __init__(self, server, data, initpath, part):
super(StreamSubtitle, self).__init__(server, data, initpath, part)
self.key = data.attrib.get('key')
self.language = data.attrib.get('langauge')
self.languageCode = data.attrib.get('languageCode')
self.format = data.attrib.get('format')
class VideoTag(object):
TYPE = None
def __init__(self, server, data):
self.server = server
self.id = cast(int, data.attrib.get('id'))
self.tag = data.attrib.get('tag')
self.role = data.attrib.get('role')
def __repr__(self):
tag = self.tag.replace(' ','.')[0:20]
return '<%s:%s:%s>' % (self.__class__.__name__, self.id, tag)
def related(self, vtype=None):
return self.server.library.search(None, **{self.FILTER:self})
class Country(VideoTag): TYPE='Country'; FILTER='country'
class Director(VideoTag): TYPE = 'Director'; FILTER='director'
class Genre(VideoTag): TYPE='Genre'; FILTER='genre'
class Producer(VideoTag): TYPE = 'Producer'; FILTER='producer'
class Actor(VideoTag): TYPE = 'Role'; FILTER='actor'
class Writer(VideoTag): TYPE = 'Writer'; FILTER='writer'

214
plexapi/myplex.py Normal file
View file

@ -0,0 +1,214 @@
"""
PlexAPI MyPlex
"""
import plexapi, requests
from requests.status_codes import _codes as codes
from threading import Thread
from xml.etree import ElementTree
from plexapi import TIMEOUT, log
from plexapi.exceptions import BadRequest, NotFound
from plexapi.utils import cast, toDatetime, Connection
from plexapi.sync import SyncItem
class MyPlexUser:
""" Logs into my.plexapp.com to fetch account and token information. This
useful to get a token if not on the local network.
"""
SIGNIN = 'https://my.plexapp.com/users/sign_in.xml'
def __init__(self, username, password):
data = self._signin(username, password)
self.email = data.attrib.get('email')
self.id = data.attrib.get('id')
self.thumb = data.attrib.get('thumb')
self.username = data.attrib.get('username')
self.title = data.attrib.get('title')
self.cloudSyncDevice = data.attrib.get('cloudSyncDevice')
self.authenticationToken = data.attrib.get('authenticationToken')
self.queueEmail = data.attrib.get('queueEmail')
self.queueUid = data.attrib.get('queueUid')
def _signin(self, username, password):
auth = (username, password)
log.info('POST %s', self.SIGNIN)
response = requests.post(self.SIGNIN, headers=plexapi.BASE_HEADERS, auth=auth, timeout=TIMEOUT)
if response.status_code != requests.codes.created:
codename = codes.get(response.status_code)[0]
raise BadRequest('(%s) %s' % (response.status_code, codename))
data = response.text.encode('utf8')
return ElementTree.fromstring(data)
def servers(self):
return MyPlexServer.fetchServers(self.authenticationToken)
def getServer(self, nameOrSourceTitle):
search = nameOrSourceTitle.lower()
for server in self.servers():
if server.name and search == server.name.lower(): return server
if server.sourceTitle and search == server.sourceTitle.lower(): return server
raise NotFound('Unable to find server: %s' % nameOrSourceTitle)
def devices(self):
return MyPlexDevice.fetchDevices(self.authenticationToken)
def getDevice(self, nameOrClientIdentifier):
search = nameOrClientIdentifier.lower()
for device in self.devices():
device_name = device.name.lower()
device_cid = device.clientIdentifier.lower()
if search in (device_name, device_cid):
return device
raise NotFound('Unable to find device: %s' % nameOrClientIdentifier)
def syncDevices(self):
return filter(lambda x: 'sync-target' in x.provides, self.devices())
class MyPlexAccount:
""" Represents myPlex account if you already have a connection to a server. """
def __init__(self, server, data):
self.authToken = data.attrib.get('authToken')
self.username = data.attrib.get('username')
self.mappingState = data.attrib.get('mappingState')
self.mappingError = data.attrib.get('mappingError')
self.mappingErrorMessage = data.attrib.get('mappingErrorMessage')
self.signInState = data.attrib.get('signInState')
self.publicAddress = data.attrib.get('publicAddress')
self.publicPort = data.attrib.get('publicPort')
self.privateAddress = data.attrib.get('privateAddress')
self.privatePort = data.attrib.get('privatePort')
self.subscriptionFeatures = data.attrib.get('subscriptionFeatures')
self.subscriptionActive = data.attrib.get('subscriptionActive')
self.subscriptionState = data.attrib.get('subscriptionState')
def servers(self):
return MyPlexServer.fetchServers(self.authToken)
def getServer(self, nameOrSourceTitle):
for server in self.servers():
if nameOrSourceTitle.lower() in [server.name.lower(), server.sourceTitle.lower()]:
return server
raise NotFound('Unable to find server: %s' % nameOrSourceTitle)
class MyPlexServer:
SERVERS = 'https://plex.tv/pms/servers.xml?includeLite=1'
def __init__(self, data):
self.accessToken = data.attrib.get('accessToken')
self.name = data.attrib.get('name')
self.address = data.attrib.get('address')
self.port = cast(int, data.attrib.get('port'))
self.version = data.attrib.get('version')
self.scheme = data.attrib.get('scheme')
self.host = data.attrib.get('host')
self.localAddresses = data.attrib.get('localAddresses', '').split(',')
self.machineIdentifier = data.attrib.get('machineIdentifier')
self.createdAt = toDatetime(data.attrib.get('createdAt'))
self.updatedAt = toDatetime(data.attrib.get('updatedAt'))
self.owned = cast(bool, data.attrib.get('owned'))
self.synced = cast(bool, data.attrib.get('synced'))
self.sourceTitle = data.attrib.get('sourceTitle', '')
self.ownerId = cast(int, data.attrib.get('ownerId'))
self.home = data.attrib.get('home')
def __repr__(self):
return '<%s:%s>' % (self.__class__.__name__, self.name.encode('utf8'))
def connect(self):
# Create a list of addresses to try connecting to.
# TODO: setup local addresses before external
devices = MyPlexDevice.fetchDevices(self.accessToken)
devices = filter(lambda x: x.clientIdentifier == self.machineIdentifier, devices)
addresses = []
if len(devices) == 1:
addresses += devices[0].connections
else:
addresses.append(Connection(self.address, self.port))
if self.owned:
for local in self.localAddresses:
addresses.append(Connection(local, self.port))
# Attempt to connect to all known addresses in parellel to save time, but
# only return the first server (in order) that provides a response.
threads = [None] * len(addresses)
results = [None] * len(addresses)
for i in range(len(addresses)):
args = (addresses[i], results, i)
threads[i] = Thread(target=self._connect, args=args)
threads[i].start()
for thread in threads:
thread.join()
results = filter(None, results)
if results: return results[0]
raise NotFound('Unable to connect to server: %s' % self.name)
def _connect(self, address, results, i):
from plexapi.server import PlexServer
try:
results[i] = PlexServer(address.addr, address.port, self.accessToken)
except NotFound:
results[i] = None
@classmethod
def fetchServers(cls, token):
headers = plexapi.BASE_HEADERS
headers['X-Plex-Token'] = token
log.info('GET %s?X-Plex-Token=%s', cls.SERVERS, token)
response = requests.get(cls.SERVERS, headers=headers, timeout=TIMEOUT)
data = ElementTree.fromstring(response.text.encode('utf8'))
return [MyPlexServer(elem) for elem in data]
class MyPlexDevice(object):
DEVICES = 'https://my.plexapp.com/devices.xml'
def __init__(self, data):
self.name = data.attrib.get('name')
self.publicAddress = data.attrib.get('publicAddress')
self.product = data.attrib.get('product')
self.productVersion = data.attrib.get('productVersion')
self.platform = data.attrib.get('platform')
self.platformVersion = data.attrib.get('platformVersion')
self.devices = data.attrib.get('device') # Whats going on here..
self.model = data.attrib.get('model')
self.vendor = data.attrib.get('vendor')
self.provides = data.attrib.get('provides').split(',')
self.clientIdentifier = data.attrib.get('clientIdentifier')
self.version = data.attrib.get('version')
self.id = cast(int, data.attrib.get('id'))
self.token = data.attrib.get('token')
self.createdAt = toDatetime(data.attrib.get('createdAt'))
self.lastSeenAt = toDatetime(data.attrib.get('lastSeenAt'))
self.screenResolution = data.attrib.get('screenResolution')
self.screenDensity = data.attrib.get('screenDensity')
self.connections = [Connection.from_xml(elem) for elem in data.iterfind('Connection')]
self.syncList = [elem.attrib.copy() for elem in data.iterfind('SyncList')]
self._syncItemsUrl = 'https://plex.tv/devices/{0}/sync_items.xml'.format(self.clientIdentifier)
def syncItems(self):
headers = plexapi.BASE_HEADERS
headers['X-Plex-Token'] = self.token
response = requests.get(self._syncItemsUrl, headers=headers, timeout=TIMEOUT)
data = ElementTree.fromstring(response.text.encode('utf8'))
servers = MyPlexServer.fetchServers(self.token)
return [SyncItem(self, elem, servers) for elem in data.find('SyncItems').iterfind('SyncItem')]
def __repr__(self):
return '<{0}:{1}>'.format(self.__class__.__name__, self.name)
@classmethod
def fetchDevices(cls, token):
headers = plexapi.BASE_HEADERS
headers['X-Plex-Token'] = token
response = requests.get(MyPlexDevice.DEVICES, headers=headers, timeout=TIMEOUT)
data = ElementTree.fromstring(response.text.encode('utf8'))
return [MyPlexDevice(elem) for elem in data]
if __name__ == '__main__':
import sys
myplex = MyPlexUser(sys.argv[1], sys.argv[2])
server = myplex.getServer(sys.argv[3]).connect()
print server.library.section("Movies").all()

37
plexapi/playqueue.py Normal file
View file

@ -0,0 +1,37 @@
"""
PlexAPI Play PlayQueues
"""
import plexapi, requests
from plexapi import video
from plexapi import utils
class PlayQueue(object):
def __init__(self, server, data, initpath):
self.server = server
self.initpath = initpath
self.identifier = data.attrib.get('identifier')
self.mediaTagPrefix = data.attrib.get('mediaTagPrefix')
self.mediaTagVersion = data.attrib.get('mediaTagVersion')
self.playQueueID = data.attrib.get('playQueueID')
self.playQueueSelectedItemID = data.attrib.get('playQueueSelectedItemID')
self.playQueueSelectedItemOffset = data.attrib.get('playQueueSelectedItemOffset')
self.playQueueTotalCount = data.attrib.get('playQueueTotalCount')
self.playQueueVersion = data.attrib.get('playQueueVersion')
self.items = [video.build_item(server, elem, initpath) for elem in data]
@classmethod
def create(cls, server, video, shuffle=0, continuous=0):
# NOTE: I have not yet figured out what __GID__ is below or where the proper value
# can be obtained. However, the good news is passing anything in seems to work.
path = 'playQueues%s' % utils.joinArgs({
'uri': 'library://__GID__/item/%s' % video.key,
'key': video.key,
'type': 'video',
'shuffle': shuffle,
'continuous': continuous,
'X-Plex-Client-Identifier': plexapi.X_PLEX_IDENTIFIER,
})
data = server.query(path, method=requests.post)
return cls(server, data, initpath=path)

102
plexapi/server.py Normal file
View file

@ -0,0 +1,102 @@
"""
PlexServer
"""
import requests, urllib
from requests.status_codes import _codes as codes
from plexapi import BASE_HEADERS, TIMEOUT
from plexapi import log, video
from plexapi.client import Client
from plexapi.exceptions import BadRequest, NotFound
from plexapi.library import Library
from plexapi.myplex import MyPlexAccount
from plexapi.playqueue import PlayQueue
from xml.etree import ElementTree
TOTAL_QUERIES = 0
class PlexServer(object):
def __init__(self, address='localhost', port=32400, token=None):
self.address = self._cleanAddress(address)
self.port = port
self.token = token
data = self._connect()
self.friendlyName = data.attrib.get('friendlyName')
self.machineIdentifier = data.attrib.get('machineIdentifier')
self.myPlex = bool(data.attrib.get('myPlex'))
self.myPlexMappingState = data.attrib.get('myPlexMappingState')
self.myPlexSigninState = data.attrib.get('myPlexSigninState')
self.myPlexSubscription = data.attrib.get('myPlexSubscription')
self.myPlexUsername = data.attrib.get('myPlexUsername')
self.platform = data.attrib.get('platform')
self.platformVersion = data.attrib.get('platformVersion')
self.transcoderActiveVideoSessions = int(data.attrib.get('transcoderActiveVideoSessions'))
self.updatedAt = int(data.attrib.get('updatedAt'))
self.version = data.attrib.get('version')
def __repr__(self):
return '<%s:%s:%s>' % (self.__class__.__name__, self.address, self.port)
def _cleanAddress(self, address):
address = address.lower().strip('/')
if address.startswith('http://'):
address = address[8:]
return address
def _connect(self):
try:
return self.query('/')
except Exception, err:
log.error('%s:%s: %s', self.address, self.port, err)
raise NotFound('No server found at: %s:%s' % (self.address, self.port))
@property
def library(self):
return Library(self, self.query('/library/'))
def account(self):
data = self.query('/myplex/account')
return MyPlexAccount(self, data)
def clients(self):
items = []
for elem in self.query('/clients'):
items.append(Client(self, elem))
return items
def client(self, name):
for elem in self.query('/clients'):
if elem.attrib.get('name').lower() == name.lower():
return Client(self, elem)
raise NotFound('Unknown client name: %s' % name)
def createPlayQueue(self, video):
return PlayQueue.create(self, video)
def headers(self):
headers = BASE_HEADERS
if self.token:
headers['X-Plex-Token'] = self.token
return headers
def query(self, path, method=requests.get):
global TOTAL_QUERIES; TOTAL_QUERIES += 1
url = self.url(path)
log.info('%s %s%s', method.__name__.upper(), url, '?X-Plex-Token=%s' % self.token if self.token else '')
response = method(url, headers=self.headers(), timeout=TIMEOUT)
if response.status_code not in [200, 201]:
codename = codes.get(response.status_code)[0]
raise BadRequest('(%s) %s' % (response.status_code, codename))
data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data else None
def search(self, query, videotype=None):
query = urllib.quote(query)
items = video.list_items(self, '/search?query=%s' % query)
if videotype:
return [item for item in items if item.type == videotype]
return items
def url(self, path):
return 'http://%s:%s/%s' % (self.address, self.port, path.lstrip('/'))

43
plexapi/sync.py Normal file
View file

@ -0,0 +1,43 @@
"""
PlexAPI Sync
"""
import requests
from plexapi.exceptions import NotFound
from plexapi.video import list_items
from plexapi.utils import cast
class SyncItem(object):
def __init__(self, device, data, servers=None):
self.device = device
self.servers = servers
self.id = cast(int, data.attrib.get('id'))
self.version = cast(int, data.attrib.get('version'))
self.rootTitle = data.attrib.get('rootTitle')
self.title = data.attrib.get('title')
self.metadataType = data.attrib.get('metadataType')
self.machineIdentifier = data.find('Server').get('machineIdentifier')
self.status = data.find('Status').attrib.copy()
self.MediaSettings = data.find('MediaSettings').attrib.copy()
self.policy = data.find('Policy').attrib.copy()
self.location = data.find('Location').attrib.copy()
def __repr__(self):
return '<{0}:{1}>'.format(self.__class__.__name__, self.id)
def server(self):
server = filter(lambda x: x.machineIdentifier == self.machineIdentifier, self.servers)
if 0 == len(server):
raise NotFound('Unable to find server with uuid %s' % self.machineIdentifier)
return server[0]
def getMedia(self):
server = self.server().connect()
items = list_items(server, '/sync/items/{0}'.format(self.id))
return items
def markAsDone(self, sync_id):
server = self.server().connect()
uri = '/sync/{0}/{1}/files/{2}/downloaded'.format(self.device.uuid, server.machineIdentifier, sync_id)
server.query(uri, method=requests.put)

93
plexapi/utils.py Normal file
View file

@ -0,0 +1,93 @@
"""
PlexAPI Utils
"""
import urllib
from datetime import datetime
NA = '__NA__' # Value not available
class PlexPartialObject(object):
""" Not all objects in the Plex listings return the complete list of
elements for the object. This object will allow you to assume each
object is complete, and if the specified value you request is None
it will fetch the full object automatically and update itself.
"""
def __init__(self, server, data, initpath):
self.server = server
self.initpath = initpath
self._loadData(data)
def __getattr__(self, attr):
if self.isPartialObject():
self.reload()
return self.__dict__.get(attr)
def __setattr__(self, attr, value):
if value != NA:
super(PlexPartialObject, self).__setattr__(attr, value)
def _loadData(self, data):
raise Exception('Abstract method not implemented.')
def isFullObject(self):
return self.initpath == self.key
def isPartialObject(self):
return self.initpath != self.key
def reload(self):
data = self.server.query(self.key)
self.initpath = self.key
self._loadData(data[0])
class Connection(object):
def __init__(self, addr, port):
self.addr = addr
self.port = int(port)
@classmethod
def from_xml(cls, data):
uri = data.attrib.get('uri')
addr, port = [elem.strip('/') for elem in uri.split(':')[1:]]
return Connection(addr, port)
def __repr__(self):
return '<Connection:{0}:{1}>'.format(self.addr, self.port)
def cast(func, value):
if value not in [None, NA]:
if func == bool:
value = int(value)
value = func(value)
return value
def joinArgs(args):
if not args: return ''
arglist = []
for key in sorted(args, key=lambda x:x.lower()):
value = str(args[key])
arglist.append('%s=%s' % (key, urllib.quote(value)))
return '?%s' % '&'.join(arglist)
def toDatetime(value, format=None):
if value and value != NA:
if format: value = datetime.strptime(value, format)
else: value = datetime.fromtimestamp(int(value))
return value
def lazyproperty(func):
""" Decorator: Memoize method result. """
attr = '_lazy_%s' % func.__name__
@property
def wrapper(self):
if not hasattr(self, attr):
setattr(self, attr, func(self))
return getattr(self, attr)
return wrapper

216
plexapi/video.py Normal file
View file

@ -0,0 +1,216 @@
"""
PlexVideo
"""
from plexapi.media import Media, Country, Director, Genre, Producer, Actor, Writer
from plexapi.exceptions import NotFound, UnknownType
from plexapi.utils import PlexPartialObject, NA
from plexapi.utils import cast, toDatetime
class Video(PlexPartialObject):
TYPE = None
def __eq__(self, other):
return self.type == other.type and self.key == other.key
def __repr__(self):
title = self.title.replace(' ','.')[0:20]
return '<%s:%s>' % (self.__class__.__name__, title.encode('utf8'))
def _loadData(self, data):
self.type = data.attrib.get('type', NA)
self.key = data.attrib.get('key', NA)
self.ratingKey = data.attrib.get('ratingKey', NA)
self.title = data.attrib.get('title', NA)
self.summary = data.attrib.get('summary', NA)
self.art = data.attrib.get('art', NA)
self.thumb = data.attrib.get('thumb', NA)
self.addedAt = toDatetime(data.attrib.get('addedAt', NA))
self.updatedAt = toDatetime(data.attrib.get('updatedAt', NA))
self.lastViewedAt = toDatetime(data.attrib.get('lastViewedAt', NA))
self.index = data.attrib.get('index', NA)
self.parentIndex = data.attrib.get('parentIndex', NA)
if self.isFullObject():
# These are auto-populated when requested
self.media = [Media(self.server, elem, self.initpath, self) for elem in data if elem.tag == Media.TYPE]
self.countries = [Country(self.server, elem) for elem in data if elem.tag == Country.TYPE]
self.directors = [Director(self.server, elem) for elem in data if elem.tag == Director.TYPE]
self.genres = [Genre(self.server, elem) for elem in data if elem.tag == Genre.TYPE]
self.producers = [Producer(self.server, elem) for elem in data if elem.tag == Producer.TYPE]
self.actors = [Actor(self.server, elem) for elem in data if elem.tag == Actor.TYPE]
self.writers = [Writer(self.server, elem) for elem in data if elem.tag == Writer.TYPE]
def iter_parts(self):
for media in self.media:
for part in media.parts:
yield part
def analyze(self):
self.server.query('/%s/analyze' % self.key)
def markWatched(self):
path = '/:/scrobble?key=%s&identifier=com.plexapp.plugins.library' % self.ratingKey
self.server.query(path)
self.reload()
def markUnwatched(self):
path = '/:/unscrobble?key=%s&identifier=com.plexapp.plugins.library' % self.ratingKey
self.server.query(path)
self.reload()
def play(self, client):
client.playMedia(self)
def refresh(self):
self.server.query('/%s/refresh' % self.key)
class Movie(Video):
TYPE = 'movie'
def _loadData(self, data):
super(Movie, self)._loadData(data)
self.studio = data.attrib.get('studio', NA)
self.contentRating = data.attrib.get('contentRating', NA)
self.rating = data.attrib.get('rating', NA)
self.viewCount = cast(int, data.attrib.get('viewCount', 0))
self.viewOffset = cast(int, data.attrib.get('viewOffset', 0))
self.year = cast(int, data.attrib.get('year', NA))
self.tagline = data.attrib.get('tagline', NA)
self.duration = cast(int, data.attrib.get('duration', NA))
self.originallyAvailableAt = toDatetime(data.attrib.get('originallyAvailableAt', NA), '%Y-%m-%d')
self.primaryExtraKey = data.attrib.get('primaryExtraKey', NA)
class Show(Video):
TYPE = 'show'
def _loadData(self, data):
super(Show, self)._loadData(data)
self.studio = data.attrib.get('studio', NA)
self.contentRating = data.attrib.get('contentRating', NA)
self.rating = data.attrib.get('rating', NA)
self.year = cast(int, data.attrib.get('year', NA))
self.banner = data.attrib.get('banner', NA)
self.theme = data.attrib.get('theme', NA)
self.duration = cast(int, data.attrib.get('duration', NA))
self.originallyAvailableAt = toDatetime(data.attrib.get('originallyAvailableAt', NA), '%Y-%m-%d')
self.leafCount = cast(int, data.attrib.get('leafCount', NA))
self.viewedLeafCount = cast(int, data.attrib.get('viewedLeafCount', NA))
self.childCount = cast(int, data.attrib.get('childCount', NA))
def seasons(self):
path = '/library/metadata/%s/children' % self.ratingKey
return list_items(self.server, path, Season.TYPE)
def season(self, title):
path = '/library/metadata/%s/children' % self.ratingKey
return find_item(self.server, path, title)
def episodes(self):
leavesKey = '/library/metadata/%s/allLeaves' % self.ratingKey
return list_items(self.server, leavesKey)
def episode(self, title):
path = '/library/metadata/%s/allLeaves' % self.ratingKey
return find_item(self.server, path, title)
def get(self, title):
return self.episode(title)
class Season(Video):
TYPE = 'season'
def _loadData(self, data):
super(Season, self)._loadData(data)
self.librarySectionID = data.attrib.get('librarySectionID', NA)
self.librarySectionTitle = data.attrib.get('librarySectionTitle', NA)
self.parentRatingKey = data.attrib.get('parentRatingKey', NA)
self.parentKey = data.attrib.get('parentKey', NA)
self.parentTitle = data.attrib.get('parentTitle', NA)
self.parentSummary = data.attrib.get('parentSummary', NA)
self.index = data.attrib.get('index', NA)
self.parentIndex = data.attrib.get('parentIndex', NA)
self.parentThumb = data.attrib.get('parentThumb', NA)
self.parentTheme = data.attrib.get('parentTheme', NA)
self.leafCount = cast(int, data.attrib.get('leafCount', NA))
self.viewedLeafCount = cast(int, data.attrib.get('viewedLeafCount', NA))
def episodes(self):
childrenKey = '/library/metadata/%s/children' % self.ratingKey
return list_items(self.server, childrenKey)
def episode(self, title):
path = '/library/metadata/%s/children' % self.ratingKey
return find_item(self.server, path, title)
def get(self, title):
return self.episode(title)
def show(self):
return list_items(self.server, self.parentKey)[0]
class Episode(Video):
TYPE = 'episode'
def _loadData(self, data):
super(Episode, self)._loadData(data)
self.librarySectionID = data.attrib.get('librarySectionID', NA)
self.librarySectionTitle = data.attrib.get('librarySectionTitle', NA)
self.grandparentKey = data.attrib.get('grandparentKey', NA)
self.grandparentTitle = data.attrib.get('grandparentTitle', NA)
self.grandparentThumb = data.attrib.get('grandparentThumb', NA)
self.parentKey = data.attrib.get('parentKey', NA)
self.parentIndex = data.attrib.get('parentIndex', NA)
self.parentThumb = data.attrib.get('parentThumb', NA)
self.contentRating = data.attrib.get('contentRating', NA)
self.index = data.attrib.get('index', NA)
self.rating = data.attrib.get('rating', NA)
self.viewCount = cast(int, data.attrib.get('viewCount', 0))
self.viewOffset = cast(int, data.attrib.get('viewOffset', 0))
self.year = cast(int, data.attrib.get('year', NA))
self.duration = cast(int, data.attrib.get('duration', NA))
self.originallyAvailableAt = toDatetime(data.attrib.get('originallyAvailableAt', NA), '%Y-%m-%d')
def season(self):
return list_items(self.server, self.parentKey)[0]
def show(self):
return list_items(self.server, self.grandparentKey)[0]
def build_item(server, elem, initpath):
VIDEOCLS = {Movie.TYPE:Movie, Show.TYPE:Show, Season.TYPE:Season, Episode.TYPE:Episode}
vtype = elem.attrib.get('type')
if vtype in VIDEOCLS:
cls = VIDEOCLS[vtype]
return cls(server, elem, initpath)
raise UnknownType('Unknown video type: %s' % vtype)
def find_item(server, path, title):
for elem in server.query(path):
if elem.attrib.get('title').lower() == title.lower():
return build_item(server, elem, path)
raise NotFound('Unable to find title: %s' % title)
def list_items(server, path, videotype=None):
items = []
for elem in server.query(path):
if not videotype or elem.attrib.get('type') == videotype:
try:
items.append(build_item(server, elem, path))
except UnknownType:
pass
return items
def search_type(videotype):
if videotype == Movie.TYPE: return 1
elif videotype == Show.TYPE: return 2
elif videotype == Season.TYPE: return 3
elif videotype == Episode.TYPE: return 4
raise NotFound('Unknown videotype: %s' % videotype)

4
requirements.pip Normal file
View file

@ -0,0 +1,4 @@
#---------------------------------------------------------
# PlexAPI Requirements
#---------------------------------------------------------
requests

25
setup.py Normal file
View file

@ -0,0 +1,25 @@
#!/usr/bin/python
"""
Install PlexAPI
"""
from distutils.core import setup
from setuptools import find_packages
# Fetch the current version
with open('plexapi/__init__.py') as handle:
for line in handle.readlines():
if line.startswith('VERSION'):
VERSION = line.split('=')[1].strip(" '\n")
setup(
name='PlexAPI',
version=VERSION,
description='Python bindings for the Plex API.',
author='Michael Shepanski',
author_email='mjs7231@gmail.com',
url='http://bitbucket.org/mjs7231/plexapi',
packages=find_packages(),
install_requires=['requests'],
long_description=open('README.md').read(),
keywords=['plex', 'api'],
)