python-plexapi/plexapi/utils.py

397 lines
14 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2017-08-13 05:58:08 +00:00
import logging
import os
import re
import time
import zipfile
from datetime import datetime
from getpass import getpass
2020-04-15 22:30:00 +00:00
from threading import Event, Thread
import requests
2019-08-22 12:25:38 +00:00
from plexapi import compat
2017-02-27 05:50:48 +00:00
from plexapi.exceptions import NotFound
2020-04-15 22:30:00 +00:00
from tqdm import tqdm
2019-08-22 12:25:38 +00:00
log = logging.getLogger('plexapi')
2019-07-26 11:16:55 +00:00
# Search Types - Plex uses these to filter specific media types when searching.
2017-02-04 19:18:10 +00:00
# Library Types - Populated at runtime
Improvements in tests process (#297) * lets begin * skip plexpass tests if there is not plexpass on account * test new myplex attrubutes * bootstrap: proper photos organisation * fix rest of photos tests * fix myplex new attributes test * fix music bootstrap by setting agent to lastfm * fix sync tests * increase bootstrap timeout * remove timeout from .travis.yml * do not create playlist-style photoalbums in plex-bootstraptest.py * allow negative filtering in LibrarySection.search() * fix sync tests once again * use sendCrashReports in test_settings * fix test_settings * fix test_video * do not accept eula in bootstrap * fix PlexServer.isLatest() * add test against old version of PlexServer * fix MyPlexAccount.OutOut * add flag for one-time testing in Travis * fix test_library onDeck tests * fix more tests * use tqdm in plex-bootstraptest for media scanning progress * create sections one-by-one * update docs on AlertListener for timeline entries * fix plex-bootstraptest for server version 1.3.2 * display skip/xpass/xfail reasons * fix tests on 1.3 * wait for music to be fully processed in plex-bootstraptest * fix misplaced TEST_ACCOUNT_ONCE * fix test_myplex_users, not sure if in proper-way * add pytest-rerunfailures; mark test_myplex_optout as flaky * fix comment * Revert "add pytest-rerunfailures; mark test_myplex_optout as flaky" This reverts commit 580e4c95a758c92329d757eb2f3fc3bf44b26f09. * restart plex container on failure * add conftest.wait_until() and used where some retries are required * add more wait_until() usage in test_sync * fix managed user search * fix updating managed users in myplex * allow to add new servers to existent users * add new server to a shared user while bootstrapping * add some docs on testing process * perform few attemps when unable to get the claim token * unlock websocket-client in requirements_dev * fix docblock in tools/plex-teardowntest * do not hardcode mediapart size in test_video * remove cache:pip from travis * Revert "unlock websocket-client in requirements_dev" This reverts commit 0d536bd06dbdc4a4b869a1686f8cd008898859fe. * remove debug from server.py * improve webhook tests * fix type() check to isinstance() * remove excessive `else` branch due to Hellowlol advice * add `unknown` as allowed `myPlexMappingState` in test_server
2018-09-14 18:03:23 +00:00
SEARCHTYPES = {'movie': 1, 'show': 2, 'season': 3, 'episode': 4, 'trailer': 5, 'comic': 6, 'person': 7,
'artist': 8, 'album': 9, 'track': 10, 'picture': 11, 'clip': 12, 'photo': 13, 'photoalbum': 14,
'playlist': 15, 'playlistFolder': 16, 'collection': 18, 'userPlaylistItem': 1001}
PLEXOBJECTS = {}
2016-12-16 00:17:02 +00:00
class SecretsFilter(logging.Filter):
""" Logging filter to hide secrets. """
def __init__(self, secrets=None):
self.secrets = secrets or set()
def add_secret(self, secret):
if secret is not None:
self.secrets.add(secret)
return secret
def filter(self, record):
cleanargs = list(record.args)
for i in range(len(cleanargs)):
if isinstance(cleanargs[i], compat.string_type):
2017-01-26 06:44:55 +00:00
for secret in self.secrets:
cleanargs[i] = cleanargs[i].replace(secret, '<hidden>')
record.args = tuple(cleanargs)
return True
2014-12-29 03:21:58 +00:00
def registerPlexObject(cls):
""" Registry of library types we may come across when parsing XML. This allows us to
define a few helper functions to dynamically convery the XML into objects. See
buildItem() below for an example.
2016-12-16 00:17:02 +00:00
"""
etype = getattr(cls, 'STREAMTYPE', cls.TYPE)
ehash = '%s.%s' % (cls.TAG, etype) if etype else cls.TAG
if ehash in PLEXOBJECTS:
raise Exception('Ambiguous PlexObject definition %s(tag=%s, type=%s) with %s' %
(cls.__name__, cls.TAG, etype, PLEXOBJECTS[ehash].__name__))
PLEXOBJECTS[ehash] = cls
return cls
2017-01-09 14:21:54 +00:00
2014-12-29 03:21:58 +00:00
def cast(func, value):
""" Cast the specified value to the specified type (returned by func). Currently this
2020-03-11 05:11:36 +00:00
only support str, int, float, bool. Should be extended if needed.
2017-01-03 22:58:35 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
func (func): Calback function to used cast to type (int, bool, float).
2017-01-26 05:25:13 +00:00
value (any): value to be cast and returned.
2016-12-16 23:38:08 +00:00
"""
2017-02-04 17:43:50 +00:00
if value is not None:
if func == bool:
2020-04-15 22:30:00 +00:00
if value in (1, True, "1", "true"):
return True
elif value in (0, False, "0", "false"):
return False
else:
raise ValueError(value)
elif func in (int, float):
try:
return func(value)
except ValueError:
return float('nan')
return func(value)
return value
def joinArgs(args):
2017-01-26 05:25:13 +00:00
""" Returns a query string (uses for HTTP URLs) where only the value is URL encoded.
Example return value: '?genre=action&type=1337'.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
args (dict): Arguments to include in query string.
2016-12-16 23:38:08 +00:00
"""
2016-12-16 00:17:02 +00:00
if not args:
return ''
arglist = []
2016-12-16 00:17:02 +00:00
for key in sorted(args, key=lambda x: x.lower()):
2017-07-30 05:02:10 +00:00
value = compat.ustr(args[key])
arglist.append('%s=%s' % (key, compat.quote(value)))
return '?%s' % '&'.join(arglist)
def lowerFirst(s):
return s[0].lower() + s[1:]
def rget(obj, attrstr, default=None, delim='.'): # pragma: no cover
2017-01-26 05:25:13 +00:00
""" Returns the value at the specified attrstr location within a nexted tree of
dicts, lists, tuples, functions, classes, etc. The lookup is done recursivley
for each key in attrstr (split by by the delimiter) This function is heavily
influenced by the lookups used in Django templates.
Parameters:
obj (any): Object to start the lookup in (dict, obj, list, tuple, etc).
attrstr (str): String to lookup (ex: 'foo.bar.baz.value')
default (any): Default value to return if not found.
delim (str): Delimiter separating keys in attrstr.
"""
try:
parts = attrstr.split(delim, 1)
attr = parts[0]
attrstr = parts[1] if len(parts) == 2 else None
2016-12-16 00:17:02 +00:00
if isinstance(obj, dict):
value = obj[attr]
elif isinstance(obj, list):
value = obj[int(attr)]
elif isinstance(obj, tuple):
value = obj[int(attr)]
elif isinstance(obj, object):
value = getattr(obj, attr)
if attrstr:
return rget(value, attrstr, default, delim)
return value
2017-10-25 16:34:59 +00:00
except: # noqa: E722
return default
2016-12-16 00:17:02 +00:00
def searchType(libtype):
2017-01-26 05:25:13 +00:00
""" Returns the integer value of the library string type.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
libtype (str): LibType to lookup (movie, show, season, episode, artist, album, track,
collection)
2017-01-26 05:25:13 +00:00
Raises:
2018-10-03 10:09:43 +00:00
:class:`plexapi.exceptions.NotFound`: Unknown libtype
2016-12-16 23:38:08 +00:00
"""
2017-07-30 05:02:10 +00:00
libtype = compat.ustr(libtype)
if libtype in [compat.ustr(v) for v in SEARCHTYPES.values()]:
return libtype
2016-04-13 02:55:45 +00:00
if SEARCHTYPES.get(libtype) is not None:
return SEARCHTYPES[libtype]
raise NotFound('Unknown libtype: %s' % libtype)
def threaded(callback, listargs):
""" Returns the result of <callback> for each set of \*args in listargs. Each call
to <callback> is called concurrently in their own separate threads.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
callback (func): Callback function to apply to each set of \*args.
listargs (list): List of lists; \*args to pass each thread.
2016-12-16 23:38:08 +00:00
"""
threads, results = [], []
job_is_done_event = Event()
for args in listargs:
args += [results, len(results)]
results.append(None)
threads.append(Thread(target=callback, args=args, kwargs=dict(job_is_done_event=job_is_done_event)))
threads[-1].setDaemon(True)
threads[-1].start()
while not job_is_done_event.is_set():
if all([not t.is_alive() for t in threads]):
break
time.sleep(0.05)
return [r for r in results if r is not None]
2014-12-29 03:21:58 +00:00
def toDatetime(value, format=None):
2017-01-26 05:25:13 +00:00
""" Returns a datetime object from the specified value.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
value (str): value to return as a datetime
format (str): Format to pass strftime (optional; if value is a str).
2016-12-16 23:38:08 +00:00
"""
2017-02-04 17:43:50 +00:00
if value and value is not None:
2016-12-16 00:17:02 +00:00
if format:
try:
value = datetime.strptime(value, format)
except ValueError:
log.info('Failed to parse %s to datetime, defaulting to None', value)
return None
2016-12-16 00:17:02 +00:00
else:
2019-01-19 22:23:42 +00:00
# https://bugs.python.org/issue30684
# And platform support for before epoch seems to be flaky.
# TODO check for others errors too.
2019-03-24 19:31:30 +00:00
if int(value) <= 0:
value = 86400
value = datetime.fromtimestamp(int(value))
2014-12-29 03:21:58 +00:00
return value
2017-01-09 14:21:54 +00:00
def toList(value, itemcast=None, delim=','):
""" Returns a list of strings from the specified value.
Parameters:
value (str): comma delimited string to convert to list.
itemcast (func): Function to cast each list item to (default str).
delim (str): string delimiter (optional; default ',').
"""
value = value or ''
itemcast = itemcast or str
return [itemcast(item) for item in value.split(delim) if item != '']
2017-10-26 17:51:46 +00:00
def downloadSessionImages(server, filename=None, height=150, width=150,
opacity=100, saturation=100): # pragma: no cover
""" Helper to download a bif image or thumb.url from plex.server.sessions.
Parameters:
filename (str): default to None,
height (int): Height of the image.
width (int): width of the image.
opacity (int): Opacity of the resulting image (possibly deprecated).
saturation (int): Saturating of the resulting image.
Returns:
{'hellowlol': {'filepath': '<filepath>', 'url': 'http://<url>'},
{'<username>': {filepath, url}}, ...
"""
info = {}
for media in server.sessions():
url = None
for part in media.iterParts():
if media.thumb:
url = media.thumb
if part.indexes: # always use bif images if available.
url = '/library/parts/%s/indexes/%s/%s' % (part.id, part.indexes.lower(), media.viewOffset)
if url:
if filename is None:
prettyname = media._prettyfilename()
filename = 'session_transcode_%s_%s_%s' % (media.usernames[0], prettyname, int(time.time()))
url = server.transcodeImage(url, height, width, opacity, saturation)
filepath = download(url, filename=filename)
info['username'] = {'filepath': filepath, 'url': url}
return info
def download(url, token, filename=None, savepath=None, session=None, chunksize=4024,
2017-08-18 21:23:40 +00:00
unpack=False, mocked=False, showstatus=False):
2017-02-02 03:53:05 +00:00
""" Helper to download a thumb, videofile or other media item. Returns the local
path to the downloaded file.
Parameters:
url (str): URL where the content be reached.
token (str): Plex auth token to include in headers.
2017-02-02 03:53:05 +00:00
filename (str): Filename of the downloaded file, default None.
savepath (str): Defaults to current working dir.
chunksize (int): What chunksize read/write at the time.
2017-01-09 14:21:54 +00:00
mocked (bool): Helper to do evertything except write the file.
unpack (bool): Unpack the zip file.
showstatus(bool): Display a progressbar.
2017-10-25 16:34:59 +00:00
2017-01-09 14:21:54 +00:00
Example:
>>> download(a_episode.getStreamURL(), a_episode.location)
/path/to/file
"""
# fetch the data to be saved
2017-01-09 14:21:54 +00:00
session = session or requests.Session()
headers = {'X-Plex-Token': token}
response = session.get(url, headers=headers, stream=True)
# make sure the savepath directory exists
savepath = savepath or os.getcwd()
compat.makedirs(savepath, exist_ok=True)
# try getting filename from header if not specified in arguments (used for logs, db)
if not filename and response.headers.get('Content-Disposition'):
filename = re.findall(r'filename=\"(.+)\"', response.headers.get('Content-Disposition'))
filename = filename[0] if filename[0] else None
filename = os.path.basename(filename)
fullpath = os.path.join(savepath, filename)
# append file.ext from content-type if not already there
extension = os.path.splitext(fullpath)[-1]
if not extension:
contenttype = response.headers.get('content-type')
if contenttype and 'image' in contenttype:
fullpath += contenttype.split('/')[1]
# check this is a mocked download (testing)
if mocked:
log.debug('Mocked download %s', fullpath)
2017-01-09 14:21:54 +00:00
return fullpath
# save the file to disk
log.info('Downloading: %s', fullpath)
if showstatus: # pragma: no cover
total = int(response.headers.get('content-length', 0))
2017-09-29 23:32:27 +00:00
bar = tqdm(unit='B', unit_scale=True, total=total, desc=filename)
with open(fullpath, 'wb') as handle:
for chunk in response.iter_content(chunk_size=chunksize):
handle.write(chunk)
if showstatus:
bar.update(len(chunk))
2017-09-26 18:11:19 +00:00
if showstatus: # pragma: no cover
2017-09-26 18:11:19 +00:00
bar.close()
# check we want to unzip the contents
if fullpath.endswith('zip') and unpack:
with zipfile.ZipFile(fullpath, 'r') as handle:
handle.extractall(savepath)
return fullpath
2017-07-16 20:46:03 +00:00
def tag_helper(tag, items, locked=True, remove=False):
""" Simple tag helper for editing a object. """
2017-07-16 21:04:46 +00:00
if not isinstance(items, list):
2017-07-16 20:46:03 +00:00
items = [items]
data = {}
if not remove:
for i, item in enumerate(items):
tagname = '%s[%s].tag.tag' % (tag, i)
data[tagname] = item
if remove:
tagname = '%s[].tag.tag-' % tag
data[tagname] = ','.join(items)
data['%s.locked' % tag] = 1 if locked else 0
return data
2017-10-25 22:01:42 +00:00
def getMyPlexAccount(opts=None): # pragma: no cover
""" Helper function tries to get a MyPlex Account instance by checking
the the following locations for a username and password. This is
useful to create user-friendly command line tools.
1. command-line options (opts).
2. environment variables and config.ini
3. Prompt on the command line.
"""
from plexapi import CONFIG
from plexapi.myplex import MyPlexAccount
# 1. Check command-line options
if opts and opts.username and opts.password:
print('Authenticating with Plex.tv as %s..' % opts.username)
return MyPlexAccount(opts.username, opts.password)
# 2. Check Plexconfig (environment variables and config.ini)
config_username = CONFIG.get('auth.myplex_username')
config_password = CONFIG.get('auth.myplex_password')
if config_username and config_password:
print('Authenticating with Plex.tv as %s..' % config_username)
return MyPlexAccount(config_username, config_password)
# 3. Prompt for username and password on the command line
username = input('What is your plex.tv username: ')
password = getpass('What is your plex.tv password: ')
print('Authenticating with Plex.tv as %s..' % username)
return MyPlexAccount(username, password)
2017-10-25 16:34:59 +00:00
def choose(msg, items, attr): # pragma: no cover
""" Command line helper to display a list of choices, asking the
user to choose one of the options.
"""
# Return the first item if there is only one choice
if len(items) == 1:
return items[0]
# Print all choices to the command line
print()
for index, i in enumerate(items):
name = attr(i) if callable(attr) else getattr(i, attr)
print(' %s: %s' % (index, name))
print()
# Request choice from the user
while True:
try:
2017-09-26 18:11:19 +00:00
inp = input('%s: ' % msg)
if any(s in inp for s in (':', '::', '-')):
idx = slice(*map(lambda x: int(x.strip()) if x.strip() else None, inp.split(':')))
return items[idx]
else:
return items[int(inp)]
except (ValueError, IndexError):
pass
2020-04-13 02:35:22 +00:00
def getAgentIdentifier(section, agent):
""" Return the full agent identifier from a short identifier, name, or confirm full identifier. """
agents = []
for ag in section.agents():
identifiers = [ag.identifier, ag.shortIdentifier, ag.name]
if agent in identifiers:
return ag.identifier
agents += identifiers
raise NotFound('Couldnt find "%s" in agents list (%s)' %
(agent, ', '.join(agents)))