# -*- coding: utf-8 -*- import logging import os import re import requests import time import zipfile from datetime import datetime from getpass import getpass from threading import Thread from tqdm import tqdm from plexapi import compat from plexapi.exceptions import NotFound # Search Types - Plex uses these to filter specific media types when searching. # Library Types - Populated at runtime SEARCHTYPES = {'movie': 1, 'show': 2, 'season': 3, 'episode': 4, 'artist': 8, 'album': 9, 'track': 10, 'photo': 14} PLEXOBJECTS = {} class SecretsFilter(logging.Filter): """ Logging filter to hide secrets. """ def __init__(self, secrets=None): self.secrets = secrets or set() def add_secret(self, secret): if secret is not None: self.secrets.add(secret) return secret def filter(self, record): cleanargs = list(record.args) for i in range(len(cleanargs)): if isinstance(cleanargs[i], compat.string_type): for secret in self.secrets: cleanargs[i] = cleanargs[i].replace(secret, '') record.args = tuple(cleanargs) return True def registerPlexObject(cls): """ Registry of library types we may come across when parsing XML. This allows us to define a few helper functions to dynamically convery the XML into objects. See buildItem() below for an example. """ etype = getattr(cls, 'STREAMTYPE', cls.TYPE) ehash = '%s.%s' % (cls.TAG, etype) if etype else cls.TAG if ehash in PLEXOBJECTS: raise Exception('Ambiguous PlexObject definition %s(tag=%s, type=%s) with %s' % (cls.__name__, cls.TAG, etype, PLEXOBJECTS[ehash].__name__)) PLEXOBJECTS[ehash] = cls return cls def cast(func, value): """ Cast the specified value to the specified type (returned by func). Currently this only support int, float, bool. Should be extended if needed. Parameters: func (func): Calback function to used cast to type (int, bool, float). value (any): value to be cast and returned. """ if value is not None: if func == bool: return bool(int(value)) elif func in (int, float): try: return func(value) except ValueError: return float('nan') return func(value) return value def getattributeOrNone(obj, self, attr): """ Returns result from __getattribute__ or None if not found. """ try: return super(obj, self).__getattribute__(attr) except AttributeError: return None def joinArgs(args): """ Returns a query string (uses for HTTP URLs) where only the value is URL encoded. Example return value: '?genre=action&type=1337'. Parameters: args (dict): Arguments to include in query string. """ if not args: return '' arglist = [] for key in sorted(args, key=lambda x: x.lower()): value = compat.ustr(args[key]) arglist.append('%s=%s' % (key, compat.quote(value))) return '?%s' % '&'.join(arglist) def lowerFirst(s): return s[0].lower() + s[1:] def rget(obj, attrstr, default=None, delim='.'): # pragma: no cover """ Returns the value at the specified attrstr location within a nexted tree of dicts, lists, tuples, functions, classes, etc. The lookup is done recursivley for each key in attrstr (split by by the delimiter) This function is heavily influenced by the lookups used in Django templates. Parameters: obj (any): Object to start the lookup in (dict, obj, list, tuple, etc). attrstr (str): String to lookup (ex: 'foo.bar.baz.value') default (any): Default value to return if not found. delim (str): Delimiter separating keys in attrstr. """ try: parts = attrstr.split(delim, 1) attr = parts[0] attrstr = parts[1] if len(parts) == 2 else None if isinstance(obj, dict): value = obj[attr] elif isinstance(obj, list): value = obj[int(attr)] elif isinstance(obj, tuple): value = obj[int(attr)] elif isinstance(obj, object): value = getattr(obj, attr) if attrstr: return rget(value, attrstr, default, delim) return value except: return default def searchType(libtype): """ Returns the integer value of the library string type. Parameters: libtype (str): LibType to lookup (movie, show, season, episode, artist, album, track) Raises: NotFound: Unknown libtype """ libtype = compat.ustr(libtype) if libtype in [compat.ustr(v) for v in SEARCHTYPES.values()]: return libtype if SEARCHTYPES.get(libtype) is not None: return SEARCHTYPES[libtype] raise NotFound('Unknown libtype: %s' % libtype) def threaded(callback, listargs): """ Returns the result of for each set of \*args in listargs. Each call to ', 'url': 'http://'}, {'': {filepath, url}}, ... """ info = {} for media in server.sessions(): url = None for part in media.iterParts(): if media.thumb: url = media.thumb if part.indexes: # always use bif images if available. url = '/library/parts/%s/indexes/%s/%s' % (part.id, part.indexes.lower(), media.viewOffset) if url: if filename is None: prettyname = media._prettyfilename() filename = 'session_transcode_%s_%s_%s' % (media.usernames[0], prettyname, int(time.time())) url = server.transcodeImage(url, height, width, opacity, saturation) filepath = download(url, filename=filename) info['username'] = {'filepath': filepath, 'url': url} return info def download(url, filename=None, savepath=None, session=None, chunksize=4024, unpack=False, mocked=False, showstatus=False): """ Helper to download a thumb, videofile or other media item. Returns the local path to the downloaded file. Parameters: url (str): URL where the content be reached. filename (str): Filename of the downloaded file, default None. savepath (str): Defaults to current working dir. chunksize (int): What chunksize read/write at the time. mocked (bool): Helper to do evertything except write the file. unpack (bool): Unpack the zip file. Example: >>> download(a_episode.getStreamURL(), a_episode.location) /path/to/file """ from plexapi import log # fetch the data to be saved session = session or requests.Session() response = session.get(url, stream=True) # make sure the savepath directory exists savepath = savepath or os.getcwd() compat.makedirs(savepath, exist_ok=True) # try getting filename from header if not specified in arguments (used for logs, db) if not filename and response.headers.get('Content-Disposition'): filename = re.findall(r'filename=\"(.+)\"', response.headers.get('Content-Disposition')) filename = filename[0] if filename[0] else None filename = os.path.basename(filename) fullpath = os.path.join(savepath, filename) # append file.ext from content-type if not already there extension = os.path.splitext(fullpath)[-1] if not extension: contenttype = response.headers.get('content-type') if contenttype and 'image' in contenttype: fullpath += contenttype.split('/')[1] # check this is a mocked download (testing) if mocked: log.debug('Mocked download %s', fullpath) return fullpath # save the file to disk log.info('Downloading: %s', fullpath) if showstatus: bar = tqdm(unit='B', unit_scale=True) with open(fullpath, 'wb') as handle: for chunk in response.iter_content(chunk_size=chunksize): handle.write(chunk) if showstatus: bar.update(len(chunk)) # check we want to unzip the contents if fullpath.endswith('zip') and unpack: with zipfile.ZipFile(fullpath, 'r') as handle: handle.extractall(savepath) # finished; return fillpath return fullpath def tag_helper(tag, items, locked=True, remove=False): """ Simple tag helper for editing a object. """ if not isinstance(items, list): items = [items] data = {} if not remove: for i, item in enumerate(items): tagname = '%s[%s].tag.tag' % (tag, i) data[tagname] = item if remove: tagname = '%s[].tag.tag-' % tag data[tagname] = ','.join(items) data['%s.locked' % tag] = 1 if locked else 0 return data def getMyPlexAccount(opts=None): """ Helper function tries to get a MyPlex Account instance by checking the the following locations for a username and password. This is useful to create user-friendly command line tools. 1. command-line options (opts). 2. environment variables and config.ini 3. Prompt on the command line. """ from plexapi import CONFIG from plexapi.myplex import MyPlexAccount # 1. Check command-line options if opts and opts.username and opts.password: print('Authenticating with Plex.tv as %s..' % opts.username) return MyPlexAccount(opts.username, opts.password) # 2. Check Plexconfig (environment variables and config.ini) config_username = CONFIG.get('auth.myplex_username') config_password = CONFIG.get('auth.myplex_password') if config_username and config_password: print('Authenticating with Plex.tv as %s..' % config_username) return MyPlexAccount(config_username, config_password) # 3. Prompt for username and password on the command line username = input('What is your plex.tv username: ') password = getpass('What is your plex.tv password: ') print('Authenticating with Plex.tv as %s..' % username) return MyPlexAccount(username, password) def choose(msg, items, attr): """ Command line helper to display a list of choices, asking the user to choose one of the options. """ # Return the first item if there is only one choice if len(items) == 1: return items[0] # Print all choices to the command line print() for index, i in enumerate(items): name = attr(i) if callable(attr) else getattr(i, attr) print(' %s: %s' % (index, name)) print() # Request choice from the user while True: try: number = int(input('%s: ' % msg)) return items[number] except (ValueError, IndexError): pass