python-plexapi/plexapi/utils.py

255 lines
8.8 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import logging, os, requests
2014-12-29 03:21:58 +00:00
from datetime import datetime
from threading import Thread
from plexapi.compat import quote, string_type
from plexapi.exceptions import NotFound
# Search Types - Plex uses these to filter specific media types when searching.
2017-02-04 19:18:10 +00:00
# Library Types - Populated at runtime
2017-02-02 03:53:05 +00:00
SEARCHTYPES = {'movie': 1, 'show': 2, 'season': 3, 'episode': 4,
'artist': 8, 'album': 9, 'track': 10, 'photo': 14}
PLEXOBJECTS = {}
2016-12-16 00:17:02 +00:00
class SecretsFilter(logging.Filter):
""" Logging filter to hide secrets. """
def __init__(self, secrets=None):
self.secrets = secrets or set()
def add_secret(self, secret):
if secret is not None:
self.secrets.add(secret)
return secret
def filter(self, record):
cleanargs = list(record.args)
for i in range(len(cleanargs)):
2017-01-26 06:44:55 +00:00
if isinstance(cleanargs[i], string_type):
for secret in self.secrets:
cleanargs[i] = cleanargs[i].replace(secret, '<hidden>')
record.args = tuple(cleanargs)
return True
2014-12-29 03:21:58 +00:00
def registerPlexObject(cls):
""" Registry of library types we may come across when parsing XML. This allows us to
define a few helper functions to dynamically convery the XML into objects. See
buildItem() below for an example.
2016-12-16 00:17:02 +00:00
"""
etype = getattr(cls, 'STREAMTYPE', cls.TYPE)
ehash = '%s.%s' % (cls.TAG, etype) if etype else cls.TAG
if ehash in PLEXOBJECTS:
raise Exception('Ambiguous PlexObject definition %s(tag=%s, type=%s) with %s' %
(cls.__name__, cls.TAG, etype, PLEXOBJECTS[ehash].__name__))
PLEXOBJECTS[ehash] = cls
return cls
2017-01-09 14:21:54 +00:00
2014-12-29 03:21:58 +00:00
def cast(func, value):
""" Cast the specified value to the specified type (returned by func). Currently this
only support int, float, bool. Should be extended if needed.
2017-01-03 22:58:35 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
func (func): Calback function to used cast to type (int, bool, float).
2017-01-26 05:25:13 +00:00
value (any): value to be cast and returned.
2016-12-16 23:38:08 +00:00
"""
2017-02-04 17:43:50 +00:00
if value is not None:
if func == bool:
return bool(int(value))
elif func in (int, float):
try:
return func(value)
except ValueError:
return float('nan')
return func(value)
return value
def firstAttr(elem, *attrs):
""" Return the first attribute in attrs that is not None. """
for attr in attrs:
value = elem.__dict__.get(attr)
if value is not None:
value = str(value).replace(' ','-')
if attr == 'key':
value = value.replace('/library/metadata/','')
value = value.replace('/children','')
return value[:20]
2017-02-08 05:36:22 +00:00
def getattributeOrNone(obj, self, attr):
try:
return super(obj, self).__getattribute__(attr)
except AttributeError:
return None
def joinArgs(args):
2017-01-26 05:25:13 +00:00
""" Returns a query string (uses for HTTP URLs) where only the value is URL encoded.
Example return value: '?genre=action&type=1337'.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
args (dict): Arguments to include in query string.
2016-12-16 23:38:08 +00:00
"""
2016-12-16 00:17:02 +00:00
if not args:
return ''
arglist = []
2016-12-16 00:17:02 +00:00
for key in sorted(args, key=lambda x: x.lower()):
value = str(args[key])
arglist.append('%s=%s' % (key, quote(value)))
return '?%s' % '&'.join(arglist)
def rget(obj, attrstr, default=None, delim='.'): # pragma: no cover
2017-01-26 05:25:13 +00:00
""" Returns the value at the specified attrstr location within a nexted tree of
dicts, lists, tuples, functions, classes, etc. The lookup is done recursivley
for each key in attrstr (split by by the delimiter) This function is heavily
influenced by the lookups used in Django templates.
Parameters:
obj (any): Object to start the lookup in (dict, obj, list, tuple, etc).
attrstr (str): String to lookup (ex: 'foo.bar.baz.value')
default (any): Default value to return if not found.
delim (str): Delimiter separating keys in attrstr.
"""
try:
parts = attrstr.split(delim, 1)
attr = parts[0]
attrstr = parts[1] if len(parts) == 2 else None
2016-12-16 00:17:02 +00:00
if isinstance(obj, dict):
value = obj[attr]
elif isinstance(obj, list):
value = obj[int(attr)]
elif isinstance(obj, tuple):
value = obj[int(attr)]
elif isinstance(obj, object):
value = getattr(obj, attr)
if attrstr:
return rget(value, attrstr, default, delim)
return value
except:
return default
2016-12-16 00:17:02 +00:00
def searchType(libtype):
2017-01-26 05:25:13 +00:00
""" Returns the integer value of the library string type.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
libtype (str): Library type to lookup (movie, show, season, episode,
artist, album, track)
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Raises:
NotFound: Unknown libtype
2016-12-16 23:38:08 +00:00
"""
2016-04-13 02:55:45 +00:00
libtype = str(libtype)
if libtype in [str(v) for v in SEARCHTYPES.values()]:
return libtype
2016-04-13 02:55:45 +00:00
if SEARCHTYPES.get(libtype) is not None:
return SEARCHTYPES[libtype]
raise NotFound('Unknown libtype: %s' % libtype)
def threaded(callback, listargs):
""" Returns the result of <callback> for each set of \*args in listargs. Each call
2017-01-26 05:25:13 +00:00
to <callback. is called concurrently in their own separate threads.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
callback (func): Callback function to apply to each set of \*args.
listargs (list): List of lists; \*args to pass each thread.
2016-12-16 23:38:08 +00:00
"""
threads, results = [], []
for args in listargs:
args += [results, len(results)]
results.append(None)
threads.append(Thread(target=callback, args=args))
threads[-1].start()
for thread in threads:
thread.join()
return results
2014-12-29 03:21:58 +00:00
def toDatetime(value, format=None):
2017-01-26 05:25:13 +00:00
""" Returns a datetime object from the specified value.
2016-12-16 23:38:08 +00:00
2017-01-26 05:25:13 +00:00
Parameters:
value (str): value to return as a datetime
format (str): Format to pass strftime (optional; if value is a str).
2016-12-16 23:38:08 +00:00
"""
2017-02-04 17:43:50 +00:00
if value and value is not None:
2016-12-16 00:17:02 +00:00
if format:
value = datetime.strptime(value, format)
else:
value = datetime.fromtimestamp(int(value))
2014-12-29 03:21:58 +00:00
return value
2017-01-09 14:21:54 +00:00
def toList(value, itemcast=None, delim=','):
""" Returns a list of strings from the specified value.
Parameters:
value (str): comma delimited string to convert to list.
itemcast (func): Function to cast each list item to (default str).
delim (str): string delimiter (optional; default ',').
"""
value = value or ''
itemcast = itemcast or str
return [itemcast(item) for item in value.split(delim) if item != '']
2017-01-09 14:21:54 +00:00
def download(url, filename=None, savepath=None, session=None, chunksize=4024, mocked=False):
2017-02-02 03:53:05 +00:00
""" Helper to download a thumb, videofile or other media item. Returns the local
path to the downloaded file.
Parameters:
url (str): URL where the content be reached.
filename (str): Filename of the downloaded file, default None.
savepath (str): Defaults to current working dir.
chunksize (int): What chunksize read/write at the time.
2017-01-09 14:21:54 +00:00
mocked (bool): Helper to do evertything except write the file.
Example:
>>> download(a_episode.getStreamURL(), a_episode.location)
/path/to/file
"""
# TODO: Review this; It should be properly logging and raising exceptions..
from plexapi import log
2017-01-09 14:21:54 +00:00
session = session or requests.Session()
if savepath is None:
savepath = os.getcwd()
else:
# Make sure the user supplied path exists
try:
os.makedirs(savepath)
except OSError:
2017-01-29 21:22:48 +00:00
if not os.path.isdir(savepath): # pragma: no cover
2017-01-09 14:21:54 +00:00
raise
filename = os.path.basename(filename)
fullpath = os.path.join(savepath, filename)
try:
response = session.get(url, stream=True)
# images dont have a extention so we try
# to guess it from content-type
ext = os.path.splitext(fullpath)[-1]
if ext:
ext = ''
else:
cp = response.headers.get('content-type')
if cp:
if 'image' in cp:
ext = '.%s' % cp.split('/')[1]
fullpath = '%s%s' % (fullpath, ext)
if mocked:
log.debug('Mocked download %s', fullpath)
2017-01-09 14:21:54 +00:00
return fullpath
with open(fullpath, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunksize):
if chunk:
f.write(chunk)
#log.debug('Downloaded %s to %s from %s' % (filename, fullpath, url))
2017-01-09 14:21:54 +00:00
return fullpath
2017-02-02 03:53:05 +00:00
except Exception as err: # pragma: no cover
log.error('Error downloading file: %s' % err)
raise
#log.exception('Failed to download %s to %s %s' % (url, fullpath, e))