Update PlexAPI to f-strings (#1000)

* Update plexapi to f-strings

* Update tests to f-strings
This commit is contained in:
JonnyWong16 2022-08-27 22:56:01 -07:00 committed by GitHub
parent fbc124aa57
commit 020a32f2b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
30 changed files with 365 additions and 418 deletions

View file

@ -120,7 +120,7 @@ class Audio(PlexPartialObject, PlayedUnplayedMixin):
section = self._server.library.sectionByID(self.librarySectionID) section = self._server.library.sectionByID(self.librarySectionID)
sync_item.location = 'library://%s/item/%s' % (section.uuid, quote_plus(self.key)) sync_item.location = f'library://{section.uuid}/item/{quote_plus(self.key)}'
sync_item.policy = Policy.create(limit) sync_item.policy = Policy.create(limit)
sync_item.mediaSettings = MediaSettings.createMusic(bitrate) sync_item.mediaSettings = MediaSettings.createMusic(bitrate)
@ -235,7 +235,7 @@ class Artist(
def station(self): def station(self):
""" Returns a :class:`~plexapi.playlist.Playlist` artist radio station or `None`. """ """ Returns a :class:`~plexapi.playlist.Playlist` artist radio station or `None`. """
key = '%s?includeStations=1' % self.key key = f'{self.key}?includeStations=1'
return next(iter(self.fetchItems(key, cls=Playlist, rtag="Stations")), None) return next(iter(self.fetchItems(key, cls=Playlist, rtag="Stations")), None)
@ -356,7 +356,7 @@ class Album(
def _defaultSyncTitle(self): def _defaultSyncTitle(self):
""" Returns str, default title for a new syncItem. """ """ Returns str, default title for a new syncItem. """
return '%s - %s' % (self.parentTitle, self.title) return f'{self.parentTitle} - {self.title}'
@utils.registerPlexObject @utils.registerPlexObject
@ -435,8 +435,7 @@ class Track(
def _prettyfilename(self): def _prettyfilename(self):
""" Returns a filename for use in download. """ """ Returns a filename for use in download. """
return '%s - %s - %s - %s' % ( return f'{self.grandparentTitle} - {self.parentTitle} - {str(self.trackNumber).zfill(2)} - {self.title}'
self.grandparentTitle, self.parentTitle, str(self.trackNumber).zfill(2), self.title)
def album(self): def album(self):
""" Return the track's :class:`~plexapi.audio.Album`. """ """ Return the track's :class:`~plexapi.audio.Album`. """
@ -463,7 +462,7 @@ class Track(
def _defaultSyncTitle(self): def _defaultSyncTitle(self):
""" Returns str, default title for a new syncItem. """ """ Returns str, default title for a new syncItem. """
return '%s - %s - %s' % (self.grandparentTitle, self.parentTitle, self.title) return f'{self.grandparentTitle} - {self.parentTitle} - {self.title}'
def _getWebURL(self, base=None): def _getWebURL(self, base=None):
""" Get the Plex Web URL with the correct parameters. """ """ Get the Plex Web URL with the correct parameters. """

View file

@ -59,7 +59,7 @@ class PlexObject:
def __repr__(self): def __repr__(self):
uid = self._clean(self.firstAttr('_baseurl', 'ratingKey', 'id', 'key', 'playQueueID', 'uri')) uid = self._clean(self.firstAttr('_baseurl', 'ratingKey', 'id', 'key', 'playQueueID', 'uri'))
name = self._clean(self.firstAttr('title', 'name', 'username', 'product', 'tag', 'value')) name = self._clean(self.firstAttr('title', 'name', 'username', 'product', 'tag', 'value'))
return '<%s>' % ':'.join([p for p in [self.__class__.__name__, uid, name] if p]) return f"<{':'.join([p for p in [self.__class__.__name__, uid, name] if p])}>"
def __setattr__(self, attr, value): def __setattr__(self, attr, value):
overwriteNone = self.__dict__.get('_overwriteNone') overwriteNone = self.__dict__.get('_overwriteNone')
@ -84,14 +84,14 @@ class PlexObject:
return cls(self._server, elem, initpath, parent=self) return cls(self._server, elem, initpath, parent=self)
# cls is not specified, try looking it up in PLEXOBJECTS # cls is not specified, try looking it up in PLEXOBJECTS
etype = elem.attrib.get('streamType', elem.attrib.get('tagType', elem.attrib.get('type'))) etype = elem.attrib.get('streamType', elem.attrib.get('tagType', elem.attrib.get('type')))
ehash = '%s.%s' % (elem.tag, etype) if etype else elem.tag ehash = f'{elem.tag}.{etype}' if etype else elem.tag
if initpath == '/status/sessions': if initpath == '/status/sessions':
ehash = '%s.%s' % (ehash, 'session') ehash = f"{ehash}.{'session'}"
ecls = utils.PLEXOBJECTS.get(ehash, utils.PLEXOBJECTS.get(elem.tag)) ecls = utils.PLEXOBJECTS.get(ehash, utils.PLEXOBJECTS.get(elem.tag))
# log.debug('Building %s as %s', elem.tag, ecls.__name__) # log.debug('Building %s as %s', elem.tag, ecls.__name__)
if ecls is not None: if ecls is not None:
return ecls(self._server, elem, initpath) return ecls(self._server, elem, initpath)
raise UnknownType("Unknown library type <%s type='%s'../>" % (elem.tag, etype)) raise UnknownType(f"Unknown library type <{elem.tag} type='{etype}'../>")
def _buildItemOrNone(self, elem, cls=None, initpath=None): def _buildItemOrNone(self, elem, cls=None, initpath=None):
""" Calls :func:`~plexapi.base.PlexObject._buildItem` but returns """ Calls :func:`~plexapi.base.PlexObject._buildItem` but returns
@ -167,7 +167,7 @@ class PlexObject:
if ekey is None: if ekey is None:
raise BadRequest('ekey was not provided') raise BadRequest('ekey was not provided')
if isinstance(ekey, int): if isinstance(ekey, int):
ekey = '/library/metadata/%s' % ekey ekey = f'/library/metadata/{ekey}'
data = self._server.query(ekey) data = self._server.query(ekey)
item = self.findItem(data, cls, ekey, **kwargs) item = self.findItem(data, cls, ekey, **kwargs)
@ -179,7 +179,7 @@ class PlexObject:
return item return item
clsname = cls.__name__ if cls else 'None' clsname = cls.__name__ if cls else 'None'
raise NotFound('Unable to find elem: cls=%s, attrs=%s' % (clsname, kwargs)) raise NotFound(f'Unable to find elem: cls={clsname}, attrs={kwargs}')
def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, **kwargs): def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, **kwargs):
""" Load the specified key to find and build all items with the specified tag """ Load the specified key to find and build all items with the specified tag
@ -328,7 +328,7 @@ class PlexObject:
if rtag: if rtag:
data = next(utils.iterXMLBFS(data, rtag), []) data = next(utils.iterXMLBFS(data, rtag), [])
for elem in data: for elem in data:
kwargs['%s__exists' % attr] = True kwargs[f'{attr}__exists'] = True
if self._checkAttrs(elem, **kwargs): if self._checkAttrs(elem, **kwargs):
results.append(elem.attrib.get(attr)) results.append(elem.attrib.get(attr))
return results return results
@ -399,7 +399,7 @@ class PlexObject:
def _getAttrOperator(self, attr): def _getAttrOperator(self, attr):
for op, operator in OPERATORS.items(): for op, operator in OPERATORS.items():
if attr.endswith('__%s' % op): if attr.endswith(f'__{op}'):
attr = attr.rsplit('__', 1)[0] attr = attr.rsplit('__', 1)[0]
return attr, op, operator return attr, op, operator
# default to exact match # default to exact match
@ -496,7 +496,7 @@ class PlexPartialObject(PlexObject):
# Log the reload. # Log the reload.
clsname = self.__class__.__name__ clsname = self.__class__.__name__
title = self.__dict__.get('title', self.__dict__.get('name')) title = self.__dict__.get('title', self.__dict__.get('name'))
objname = "%s '%s'" % (clsname, title) if title else clsname objname = f"{clsname} '{title}'" if title else clsname
log.debug("Reloading %s for attr '%s'", objname, attr) log.debug("Reloading %s for attr '%s'", objname, attr)
# Reload and return the value # Reload and return the value
self._reload(_overwriteNone=False) self._reload(_overwriteNone=False)
@ -521,7 +521,7 @@ class PlexPartialObject(PlexObject):
* Generate intro video markers: Detects show intros, exposing the * Generate intro video markers: Detects show intros, exposing the
'Skip Intro' button in clients. 'Skip Intro' button in clients.
""" """
key = '/%s/analyze' % self.key.lstrip('/') key = f"/{self.key.lstrip('/')}/analyze"
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
def isFullObject(self): def isFullObject(self):
@ -547,8 +547,7 @@ class PlexPartialObject(PlexObject):
if 'type' not in kwargs: if 'type' not in kwargs:
kwargs['type'] = utils.searchType(self._searchType) kwargs['type'] = utils.searchType(self._searchType)
part = '/library/sections/%s/all%s' % (self.librarySectionID, part = f'/library/sections/{self.librarySectionID}/all{utils.joinArgs(kwargs)}'
utils.joinArgs(kwargs))
self._server.query(part, method=self._server._session.put) self._server.query(part, method=self._server._session.put)
return self return self
@ -627,7 +626,7 @@ class PlexPartialObject(PlexObject):
the refresh process is interrupted (the Server is turned off, internet the refresh process is interrupted (the Server is turned off, internet
connection dies, etc). connection dies, etc).
""" """
key = '%s/refresh' % self.key key = f'{self.key}/refresh'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
def section(self): def section(self):
@ -700,7 +699,7 @@ class Playable:
:exc:`~plexapi.exceptions.Unsupported`: When the item doesn't support fetching a stream URL. :exc:`~plexapi.exceptions.Unsupported`: When the item doesn't support fetching a stream URL.
""" """
if self.TYPE not in ('movie', 'episode', 'track', 'clip'): if self.TYPE not in ('movie', 'episode', 'track', 'clip'):
raise Unsupported('Fetching stream URL for %s is unsupported.' % self.TYPE) raise Unsupported(f'Fetching stream URL for {self.TYPE} is unsupported.')
mvb = params.get('maxVideoBitrate') mvb = params.get('maxVideoBitrate')
vr = params.get('videoResolution', '') vr = params.get('videoResolution', '')
params = { params = {
@ -718,8 +717,10 @@ class Playable:
streamtype = 'audio' if self.TYPE in ('track', 'album') else 'video' streamtype = 'audio' if self.TYPE in ('track', 'album') else 'video'
# sort the keys since the randomness fucks with my tests.. # sort the keys since the randomness fucks with my tests..
sorted_params = sorted(params.items(), key=lambda val: val[0]) sorted_params = sorted(params.items(), key=lambda val: val[0])
return self._server.url('/%s/:/transcode/universal/start.m3u8?%s' % return self._server.url(
(streamtype, urlencode(sorted_params)), includeToken=True) f'/{streamtype}/:/transcode/universal/start.m3u8?{urlencode(sorted_params)}',
includeToken=True
)
def iterParts(self): def iterParts(self):
""" Iterates over the parts of this media item. """ """ Iterates over the parts of this media item. """
@ -759,7 +760,7 @@ class Playable:
for part in parts: for part in parts:
if not keep_original_name: if not keep_original_name:
filename = utils.cleanFilename('%s.%s' % (self._prettyfilename(), part.container)) filename = utils.cleanFilename(f'{self._prettyfilename()}.{part.container}')
else: else:
filename = part.file filename = part.file
@ -767,7 +768,7 @@ class Playable:
# So this seems to be a a lot slower but allows transcode. # So this seems to be a a lot slower but allows transcode.
download_url = self.getStreamURL(**kwargs) download_url = self.getStreamURL(**kwargs)
else: else:
download_url = self._server.url('%s?download=1' % part.key) download_url = self._server.url(f'{part.key}?download=1')
filepath = utils.download( filepath = utils.download(
download_url, download_url,
@ -794,8 +795,7 @@ class Playable:
time (int): milliseconds watched time (int): milliseconds watched
state (string): state of the video, default 'stopped' state (string): state of the video, default 'stopped'
""" """
key = '/:/progress?key=%s&identifier=com.plexapp.plugins.library&time=%d&state=%s' % (self.ratingKey, key = f'/:/progress?key={self.ratingKey}&identifier=com.plexapp.plugins.library&time={time}&state={state}'
time, state)
self._server.query(key) self._server.query(key)
self._reload(_overwriteNone=False) self._reload(_overwriteNone=False)

View file

@ -94,7 +94,7 @@ class PlexClient(PlexObject):
self._initpath = self.key self._initpath = self.key
data = self.query(self.key, timeout=timeout) data = self.query(self.key, timeout=timeout)
if not data: if not data:
raise NotFound("Client not found at %s" % self._baseurl) raise NotFound(f"Client not found at {self._baseurl}")
if self._clientIdentifier: if self._clientIdentifier:
client = next( client = next(
( (
@ -106,8 +106,7 @@ class PlexClient(PlexObject):
) )
if client is None: if client is None:
raise NotFound( raise NotFound(
"Client with identifier %s not found at %s" f"Client with identifier {self._clientIdentifier} not found at {self._baseurl}"
% (self._clientIdentifier, self._baseurl)
) )
else: else:
client = data[0] client = data[0]
@ -186,7 +185,7 @@ class PlexClient(PlexObject):
if response.status_code not in (200, 201, 204): if response.status_code not in (200, 201, 204):
codename = codes.get(response.status_code)[0] codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ') errtext = response.text.replace('\n', ' ')
message = '(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext) message = f'({response.status_code}) {codename}; {response.url} {errtext}'
if response.status_code == 401: if response.status_code == 401:
raise Unauthorized(message) raise Unauthorized(message)
elif response.status_code == 404: elif response.status_code == 404:
@ -213,8 +212,7 @@ class PlexClient(PlexObject):
controller = command.split('/')[0] controller = command.split('/')[0]
headers = {'X-Plex-Target-Client-Identifier': self.machineIdentifier} headers = {'X-Plex-Target-Client-Identifier': self.machineIdentifier}
if controller not in self.protocolCapabilities: if controller not in self.protocolCapabilities:
log.debug("Client %s doesn't support %s controller." log.debug(f"Client {self.title} doesn't support {controller} controller. What your trying might not work")
"What your trying might not work" % (self.title, controller))
proxy = self._proxyThroughServer if proxy is None else proxy proxy = self._proxyThroughServer if proxy is None else proxy
query = self._server.query if proxy else self.query query = self._server.query if proxy else self.query
@ -228,7 +226,7 @@ class PlexClient(PlexObject):
self.sendCommand(ClientTimeline.key, wait=0) self.sendCommand(ClientTimeline.key, wait=0)
params['commandID'] = self._nextCommandId() params['commandID'] = self._nextCommandId()
key = '/player/%s%s' % (command, utils.joinArgs(params)) key = f'/player/{command}{utils.joinArgs(params)}'
try: try:
return query(key, headers=headers) return query(key, headers=headers)
@ -253,8 +251,8 @@ class PlexClient(PlexObject):
raise BadRequest('PlexClient object missing baseurl.') raise BadRequest('PlexClient object missing baseurl.')
if self._token and (includeToken or self._showSecrets): if self._token and (includeToken or self._showSecrets):
delim = '&' if '?' in key else '?' delim = '&' if '?' in key else '?'
return '%s%s%sX-Plex-Token=%s' % (self._baseurl, key, delim, self._token) return f'{self._baseurl}{key}{delim}X-Plex-Token={self._token}'
return '%s%s' % (self._baseurl, key) return f'{self._baseurl}{key}'
# --------------------- # ---------------------
# Navigation Commands # Navigation Commands
@ -517,7 +515,7 @@ class PlexClient(PlexObject):
'offset': offset, 'offset': offset,
'key': media.key or playqueue.selectedItem.key, 'key': media.key or playqueue.selectedItem.key,
'type': mediatype, 'type': mediatype,
'containerKey': '/playQueues/%s?window=100&own=1' % playqueue.playQueueID, 'containerKey': f'/playQueues/{playqueue.playQueueID}?window=100&own=1',
**params, **params,
} }
token = media._server.createToken() token = media._server.createToken()

View file

@ -184,12 +184,12 @@ class Collection(
for item in self.items(): for item in self.items():
if item.title.lower() == title.lower(): if item.title.lower() == title.lower():
return item return item
raise NotFound('Item with title "%s" not found in the collection' % title) raise NotFound(f'Item with title "{title}" not found in the collection')
def items(self): def items(self):
""" Returns a list of all items in the collection. """ """ Returns a list of all items in the collection. """
if self._items is None: if self._items is None:
key = '%s/children' % self.key key = f'{self.key}/children'
items = self.fetchItems(key) items = self.fetchItems(key)
self._items = items self._items = items
return self._items return self._items
@ -233,7 +233,7 @@ class Collection(
} }
key = user_dict.get(user) key = user_dict.get(user)
if key is None: if key is None:
raise BadRequest('Unknown collection filtering user: %s. Options %s' % (user, list(user_dict))) raise BadRequest(f'Unknown collection filtering user: {user}. Options {list(user_dict)}')
return self.editAdvanced(collectionFilterBasedOnUser=key) return self.editAdvanced(collectionFilterBasedOnUser=key)
def modeUpdate(self, mode=None): def modeUpdate(self, mode=None):
@ -260,7 +260,7 @@ class Collection(
} }
key = mode_dict.get(mode) key = mode_dict.get(mode)
if key is None: if key is None:
raise BadRequest('Unknown collection mode: %s. Options %s' % (mode, list(mode_dict))) raise BadRequest(f'Unknown collection mode: {mode}. Options {list(mode_dict)}')
return self.editAdvanced(collectionMode=key) return self.editAdvanced(collectionMode=key)
def sortUpdate(self, sort=None): def sortUpdate(self, sort=None):
@ -288,7 +288,7 @@ class Collection(
} }
key = sort_dict.get(sort) key = sort_dict.get(sort)
if key is None: if key is None:
raise BadRequest('Unknown sort dir: %s. Options: %s' % (sort, list(sort_dict))) raise BadRequest(f'Unknown sort dir: {sort}. Options: {list(sort_dict)}')
return self.editAdvanced(collectionSort=key) return self.editAdvanced(collectionSort=key)
def addItems(self, items): def addItems(self, items):
@ -310,16 +310,14 @@ class Collection(
ratingKeys = [] ratingKeys = []
for item in items: for item in items:
if item.type != self.subtype: # pragma: no cover if item.type != self.subtype: # pragma: no cover
raise BadRequest('Can not mix media types when building a collection: %s and %s' % raise BadRequest(f'Can not mix media types when building a collection: {self.subtype} and {item.type}')
(self.subtype, item.type))
ratingKeys.append(str(item.ratingKey)) ratingKeys.append(str(item.ratingKey))
ratingKeys = ','.join(ratingKeys) ratingKeys = ','.join(ratingKeys)
uri = '%s/library/metadata/%s' % (self._server._uriRoot(), ratingKeys) uri = f'{self._server._uriRoot()}/library/metadata/{ratingKeys}'
key = '%s/items%s' % (self.key, utils.joinArgs({ args = {'uri': uri}
'uri': uri key = f"{self.key}/items{utils.joinArgs(args)}"
}))
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -340,7 +338,7 @@ class Collection(
items = [items] items = [items]
for item in items: for item in items:
key = '%s/items/%s' % (self.key, item.ratingKey) key = f'{self.key}/items/{item.ratingKey}'
self._server.query(key, method=self._server._session.delete) self._server.query(key, method=self._server._session.delete)
return self return self
@ -359,10 +357,10 @@ class Collection(
if self.smart: if self.smart:
raise BadRequest('Cannot move items in a smart collection.') raise BadRequest('Cannot move items in a smart collection.')
key = '%s/items/%s/move' % (self.key, item.ratingKey) key = f'{self.key}/items/{item.ratingKey}/move'
if after: if after:
key += '?after=%s' % after.ratingKey key += f'?after={after.ratingKey}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -391,11 +389,10 @@ class Collection(
section = self.section() section = self.section()
searchKey = section._buildSearchKey( searchKey = section._buildSearchKey(
sort=sort, libtype=libtype, limit=limit, filters=filters, **kwargs) sort=sort, libtype=libtype, limit=limit, filters=filters, **kwargs)
uri = '%s%s' % (self._server._uriRoot(), searchKey) uri = f'{self._server._uriRoot()}{searchKey}'
key = '%s/items%s' % (self.key, utils.joinArgs({ args = {'uri': uri}
'uri': uri key = f"{self.key}/items{utils.joinArgs(args)}"
}))
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -454,15 +451,10 @@ class Collection(
ratingKeys.append(str(item.ratingKey)) ratingKeys.append(str(item.ratingKey))
ratingKeys = ','.join(ratingKeys) ratingKeys = ','.join(ratingKeys)
uri = '%s/library/metadata/%s' % (server._uriRoot(), ratingKeys) uri = f'{server._uriRoot()}/library/metadata/{ratingKeys}'
key = '/library/collections%s' % utils.joinArgs({ args = {'uri': uri, 'type': utils.searchType(itemType), 'title': title, 'smart': 0, 'sectionId': section.key}
'uri': uri, key = f"/library/collections{utils.joinArgs(args)}"
'type': utils.searchType(itemType),
'title': title,
'smart': 0,
'sectionId': section.key
})
data = server.query(key, method=server._session.post)[0] data = server.query(key, method=server._session.post)[0]
return cls(server, data, initpath=key) return cls(server, data, initpath=key)
@ -476,15 +468,10 @@ class Collection(
searchKey = section._buildSearchKey( searchKey = section._buildSearchKey(
sort=sort, libtype=libtype, limit=limit, filters=filters, **kwargs) sort=sort, libtype=libtype, limit=limit, filters=filters, **kwargs)
uri = '%s%s' % (server._uriRoot(), searchKey) uri = f'{server._uriRoot()}{searchKey}'
key = '/library/collections%s' % utils.joinArgs({ args = {'uri': uri, 'type': utils.searchType(libtype), 'title': title, 'smart': 1, 'sectionId': section.key}
'uri': uri, key = f"/library/collections{utils.joinArgs(args)}"
'type': utils.searchType(libtype),
'title': title,
'smart': 1,
'sectionId': section.key
})
data = server.query(key, method=server._session.post)[0] data = server.query(key, method=server._session.post)[0]
return cls(server, data, initpath=key) return cls(server, data, initpath=key)

View file

@ -29,7 +29,7 @@ class PlexConfig(ConfigParser):
""" """
try: try:
# First: check environment variable is set # First: check environment variable is set
envkey = 'PLEXAPI_%s' % key.upper().replace('.', '_') envkey = f"PLEXAPI_{key.upper().replace('.', '_')}"
value = os.environ.get(envkey) value = os.environ.get(envkey)
if value is None: if value is None:
# Second: check the config file has attr # Second: check the config file has attr

View file

@ -65,7 +65,7 @@ class Library(PlexObject):
try: try:
return self._sectionsByTitle[normalized_title] return self._sectionsByTitle[normalized_title]
except KeyError: except KeyError:
raise NotFound('Invalid library section: %s' % title) from None raise NotFound(f'Invalid library section: {title}') from None
def sectionByID(self, sectionID): def sectionByID(self, sectionID):
""" Returns the :class:`~plexapi.library.LibrarySection` that matches the specified sectionID. """ Returns the :class:`~plexapi.library.LibrarySection` that matches the specified sectionID.
@ -81,7 +81,7 @@ class Library(PlexObject):
try: try:
return self._sectionsByID[sectionID] return self._sectionsByID[sectionID]
except KeyError: except KeyError:
raise NotFound('Invalid library sectionID: %s' % sectionID) from None raise NotFound(f'Invalid library sectionID: {sectionID}') from None
def hubs(self, sectionID=None, identifier=None, **kwargs): def hubs(self, sectionID=None, identifier=None, **kwargs):
""" Returns a list of :class:`~plexapi.library.Hub` across all library sections. """ Returns a list of :class:`~plexapi.library.Hub` across all library sections.
@ -102,7 +102,7 @@ class Library(PlexObject):
if not isinstance(identifier, list): if not isinstance(identifier, list):
identifier = [identifier] identifier = [identifier]
kwargs['identifier'] = ",".join(identifier) kwargs['identifier'] = ",".join(identifier)
key = '/hubs%s' % utils.joinArgs(kwargs) key = f'/hubs{utils.joinArgs(kwargs)}'
return self.fetchItems(key) return self.fetchItems(key)
def all(self, **kwargs): def all(self, **kwargs):
@ -139,7 +139,7 @@ class Library(PlexObject):
args['type'] = utils.searchType(libtype) args['type'] = utils.searchType(libtype)
for attr, value in kwargs.items(): for attr, value in kwargs.items():
args[attr] = value args[attr] = value
key = '/library/all%s' % utils.joinArgs(args) key = f'/library/all{utils.joinArgs(args)}'
return self.fetchItems(key) return self.fetchItems(key)
def cleanBundles(self): def cleanBundles(self):
@ -343,11 +343,11 @@ class Library(PlexObject):
locations = [] locations = []
for path in location: for path in location:
if not self._server.isBrowsable(path): if not self._server.isBrowsable(path):
raise BadRequest('Path: %s does not exist.' % path) raise BadRequest(f'Path: {path} does not exist.')
locations.append(('location', path)) locations.append(('location', path))
part = '/library/sections?name=%s&type=%s&agent=%s&scanner=%s&language=%s&%s' % ( part = (f'/library/sections?name={quote_plus(name)}&type={type}&agent={agent}'
quote_plus(name), type, agent, quote_plus(scanner), language, urlencode(locations, doseq=True)) # noqa E126 f'&scanner={quote_plus(scanner)}&language={language}&{urlencode(locations, doseq=True)}')
if kwargs: if kwargs:
part += urlencode(kwargs) part += urlencode(kwargs)
return self._server.query(part, method=self._server._session.post) return self._server.query(part, method=self._server._session.post)
@ -512,16 +512,16 @@ class LibrarySection(PlexObject):
args['clusterZoomLevel'] = 1 args['clusterZoomLevel'] = 1
else: else:
args['type'] = utils.searchType(libtype) args['type'] = utils.searchType(libtype)
part = '/library/sections/%s/all%s' % (self.key, utils.joinArgs(args)) part = f'/library/sections/{self.key}/all{utils.joinArgs(args)}'
data = self._server.query(part) data = self._server.query(part)
return utils.cast(int, data.attrib.get("totalSize")) return utils.cast(int, data.attrib.get("totalSize"))
def delete(self): def delete(self):
""" Delete a library section. """ """ Delete a library section. """
try: try:
return self._server.query('/library/sections/%s' % self.key, method=self._server._session.delete) return self._server.query(f'/library/sections/{self.key}', method=self._server._session.delete)
except BadRequest: # pragma: no cover except BadRequest: # pragma: no cover
msg = 'Failed to delete library %s' % self.key msg = f'Failed to delete library {self.key}'
msg += 'You may need to allow this permission in your Plex settings.' msg += 'You may need to allow this permission in your Plex settings.'
log.error(msg) log.error(msg)
raise raise
@ -549,12 +549,12 @@ class LibrarySection(PlexObject):
kwargs['location'] = [kwargs['location']] kwargs['location'] = [kwargs['location']]
for path in kwargs.pop('location'): for path in kwargs.pop('location'):
if not self._server.isBrowsable(path): if not self._server.isBrowsable(path):
raise BadRequest('Path: %s does not exist.' % path) raise BadRequest(f'Path: {path} does not exist.')
locations.append(('location', path)) locations.append(('location', path))
params = list(kwargs.items()) + locations params = list(kwargs.items()) + locations
part = '/library/sections/%s?agent=%s&%s' % (self.key, agent, urlencode(params, doseq=True)) part = f'/library/sections/{self.key}?agent={agent}&{urlencode(params, doseq=True)}'
self._server.query(part, method=self._server._session.put) self._server.query(part, method=self._server._session.put)
return self return self
@ -576,7 +576,7 @@ class LibrarySection(PlexObject):
location = [location] location = [location]
for path in location: for path in location:
if not self._server.isBrowsable(path): if not self._server.isBrowsable(path):
raise BadRequest('Path: %s does not exist.' % path) raise BadRequest(f'Path: {path} does not exist.')
locations.append(path) locations.append(path)
return self.edit(location=locations) return self.edit(location=locations)
@ -600,7 +600,7 @@ class LibrarySection(PlexObject):
if path in locations: if path in locations:
locations.remove(path) locations.remove(path)
else: else:
raise BadRequest('Path: %s does not exist in the library.' % location) raise BadRequest(f'Path: {location} does not exist in the library.')
if len(locations) == 0: if len(locations) == 0:
raise BadRequest('You are unable to remove all locations from a library.') raise BadRequest('You are unable to remove all locations from a library.')
return self.edit(location=locations) return self.edit(location=locations)
@ -662,7 +662,7 @@ class LibrarySection(PlexObject):
match = dummy.matches(agent=self.agent, title=guid.replace('://', '-')) match = dummy.matches(agent=self.agent, title=guid.replace('://', '-'))
return self.search(guid=match[0].guid)[0] return self.search(guid=match[0].guid)[0]
except IndexError: except IndexError:
raise NotFound("Guid '%s' is not found in the library" % guid) from None raise NotFound(f"Guid '{guid}' is not found in the library") from None
def all(self, libtype=None, **kwargs): def all(self, libtype=None, **kwargs):
""" Returns a list of all items from this library section. """ Returns a list of all items from this library section.
@ -674,7 +674,7 @@ class LibrarySection(PlexObject):
def folders(self): def folders(self):
""" Returns a list of available :class:`~plexapi.library.Folder` for this library section. """ Returns a list of available :class:`~plexapi.library.Folder` for this library section.
""" """
key = '/library/sections/%s/folder' % self.key key = f'/library/sections/{self.key}/folder'
return self.fetchItems(key, Folder) return self.fetchItems(key, Folder)
def managedHubs(self): def managedHubs(self):
@ -692,7 +692,7 @@ class LibrarySection(PlexObject):
def hubs(self): def hubs(self):
""" Returns a list of available :class:`~plexapi.library.Hub` for this library section. """ Returns a list of available :class:`~plexapi.library.Hub` for this library section.
""" """
key = '/hubs/sections/%s?includeStations=1' % self.key key = f'/hubs/sections/{self.key}?includeStations=1'
return self.fetchItems(key) return self.fetchItems(key)
def agents(self): def agents(self):
@ -702,7 +702,7 @@ class LibrarySection(PlexObject):
def settings(self): def settings(self):
""" Returns a list of all library settings. """ """ Returns a list of all library settings. """
key = '/library/sections/%s/prefs' % self.key key = f'/library/sections/{self.key}/prefs'
data = self._server.query(key) data = self._server.query(key)
return self.findItems(data, cls=Setting) return self.findItems(data, cls=Setting)
@ -722,11 +722,11 @@ class LibrarySection(PlexObject):
try: try:
enums = idEnums[settingID] enums = idEnums[settingID]
except KeyError: except KeyError:
raise NotFound('%s not found in %s' % (value, list(idEnums.keys()))) raise NotFound(f'{value} not found in {list(idEnums.keys())}')
if value in enums: if value in enums:
data[key % settingID] = value data[key % settingID] = value
else: else:
raise NotFound('%s not found in %s' % (value, enums)) raise NotFound(f'{value} not found in {enums}')
return self.edit(**data) return self.edit(**data)
@ -747,9 +747,9 @@ class LibrarySection(PlexObject):
libtype = libtype or self.TYPE libtype = libtype or self.TYPE
args = { args = {
'type': utils.searchType(libtype), 'type': utils.searchType(libtype),
'%s.locked' % field: int(locked) f'{field}.locked': int(locked)
} }
key = '/library/sections/%s/all%s' % (self.key, utils.joinArgs(args)) key = f'/library/sections/{self.key}/all{utils.joinArgs(args)}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -775,13 +775,13 @@ class LibrarySection(PlexObject):
def timeline(self): def timeline(self):
""" Returns a timeline query for this library section. """ """ Returns a timeline query for this library section. """
key = '/library/sections/%s/timeline' % self.key key = f'/library/sections/{self.key}/timeline'
data = self._server.query(key) data = self._server.query(key)
return LibraryTimeline(self, data) return LibraryTimeline(self, data)
def onDeck(self): def onDeck(self):
""" Returns a list of media items on deck from this library section. """ """ Returns a list of media items on deck from this library section. """
key = '/library/sections/%s/onDeck' % self.key key = f'/library/sections/{self.key}/onDeck'
return self.fetchItems(key) return self.fetchItems(key)
def recentlyAdded(self, maxresults=50, libtype=None): def recentlyAdded(self, maxresults=50, libtype=None):
@ -796,20 +796,20 @@ class LibrarySection(PlexObject):
return self.search(sort='addedAt:desc', maxresults=maxresults, libtype=libtype) return self.search(sort='addedAt:desc', maxresults=maxresults, libtype=libtype)
def firstCharacter(self): def firstCharacter(self):
key = '/library/sections/%s/firstCharacter' % self.key key = f'/library/sections/{self.key}/firstCharacter'
return self.fetchItems(key, cls=FirstCharacter) return self.fetchItems(key, cls=FirstCharacter)
def analyze(self): def analyze(self):
""" Run an analysis on all of the items in this library section. See """ Run an analysis on all of the items in this library section. See
See :func:`~plexapi.base.PlexPartialObject.analyze` for more details. See :func:`~plexapi.base.PlexPartialObject.analyze` for more details.
""" """
key = '/library/sections/%s/analyze' % self.key key = f'/library/sections/{self.key}/analyze'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
def emptyTrash(self): def emptyTrash(self):
""" If a section has items in the Trash, use this option to empty the Trash. """ """ If a section has items in the Trash, use this option to empty the Trash. """
key = '/library/sections/%s/emptyTrash' % self.key key = f'/library/sections/{self.key}/emptyTrash'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -819,15 +819,15 @@ class LibrarySection(PlexObject):
Parameters: Parameters:
path (str, optional): Full path to folder to scan. path (str, optional): Full path to folder to scan.
""" """
key = '/library/sections/%s/refresh' % self.key key = f'/library/sections/{self.key}/refresh'
if path is not None: if path is not None:
key += '?path=%s' % quote_plus(path) key += f'?path={quote_plus(path)}'
self._server.query(key) self._server.query(key)
return self return self
def cancelUpdate(self): def cancelUpdate(self):
""" Cancel update of this Library Section. """ """ Cancel update of this Library Section. """
key = '/library/sections/%s/refresh' % self.key key = f'/library/sections/{self.key}/refresh'
self._server.query(key, method=self._server._session.delete) self._server.query(key, method=self._server._session.delete)
return self return self
@ -835,7 +835,7 @@ class LibrarySection(PlexObject):
""" Forces a download of fresh media information from the internet. """ Forces a download of fresh media information from the internet.
This can take a long time. Any locked fields are not modified. This can take a long time. Any locked fields are not modified.
""" """
key = '/library/sections/%s/refresh?force=1' % self.key key = f'/library/sections/{self.key}/refresh?force=1'
self._server.query(key) self._server.query(key)
return self return self
@ -843,7 +843,7 @@ class LibrarySection(PlexObject):
""" Delete the preview thumbnails for items in this library. This cannot """ Delete the preview thumbnails for items in this library. This cannot
be undone. Recreating media preview files can take hours or even days. be undone. Recreating media preview files can take hours or even days.
""" """
key = '/library/sections/%s/indexes' % self.key key = f'/library/sections/{self.key}/indexes'
self._server.query(key, method=self._server._session.delete) self._server.query(key, method=self._server._session.delete)
return self return self
@ -885,9 +885,8 @@ class LibrarySection(PlexObject):
return next(f for f in self.filterTypes() if f.type == libtype) return next(f for f in self.filterTypes() if f.type == libtype)
except StopIteration: except StopIteration:
availableLibtypes = [f.type for f in self.filterTypes()] availableLibtypes = [f.type for f in self.filterTypes()]
raise NotFound('Unknown libtype "%s" for this library. ' raise NotFound(f'Unknown libtype "{libtype}" for this library. '
'Available libtypes: %s' f'Available libtypes: {availableLibtypes}') from None
% (libtype, availableLibtypes)) from None
def fieldTypes(self): def fieldTypes(self):
""" Returns a list of available :class:`~plexapi.library.FilteringFieldType` for this library section. """ """ Returns a list of available :class:`~plexapi.library.FilteringFieldType` for this library section. """
@ -909,9 +908,8 @@ class LibrarySection(PlexObject):
return next(f for f in self.fieldTypes() if f.type == fieldType) return next(f for f in self.fieldTypes() if f.type == fieldType)
except StopIteration: except StopIteration:
availableFieldTypes = [f.type for f in self.fieldTypes()] availableFieldTypes = [f.type for f in self.fieldTypes()]
raise NotFound('Unknown field type "%s" for this library. ' raise NotFound(f'Unknown field type "{fieldType}" for this library. '
'Available field types: %s' f'Available field types: {availableFieldTypes}') from None
% (fieldType, availableFieldTypes)) from None
def listFilters(self, libtype=None): def listFilters(self, libtype=None):
""" Returns a list of available :class:`~plexapi.library.FilteringFilter` for a specified libtype. """ Returns a list of available :class:`~plexapi.library.FilteringFilter` for a specified libtype.
@ -1019,16 +1017,15 @@ class LibrarySection(PlexObject):
if isinstance(field, str): if isinstance(field, str):
match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+)', field) match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+)', field)
if not match: if not match:
raise BadRequest('Invalid filter field: %s' % field) raise BadRequest(f'Invalid filter field: {field}')
_libtype, field = match.groups() _libtype, field = match.groups()
libtype = _libtype or libtype or self.TYPE libtype = _libtype or libtype or self.TYPE
try: try:
field = next(f for f in self.listFilters(libtype) if f.filter == field) field = next(f for f in self.listFilters(libtype) if f.filter == field)
except StopIteration: except StopIteration:
availableFilters = [f.filter for f in self.listFilters(libtype)] availableFilters = [f.filter for f in self.listFilters(libtype)]
raise NotFound('Unknown filter field "%s" for libtype "%s". ' raise NotFound(f'Unknown filter field "{field}" for libtype "{libtype}". '
'Available filters: %s' f'Available filters: {availableFilters}') from None
% (field, libtype, availableFilters)) from None
data = self._server.query(field.key) data = self._server.query(field.key)
return self.findItems(data, FilterChoice) return self.findItems(data, FilterChoice)
@ -1039,7 +1036,7 @@ class LibrarySection(PlexObject):
""" """
match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+)([!<>=&]*)', field) match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+)([!<>=&]*)', field)
if not match: if not match:
raise BadRequest('Invalid filter field: %s' % field) raise BadRequest(f'Invalid filter field: {field}')
_libtype, field, operator = match.groups() _libtype, field, operator = match.groups()
libtype = _libtype or libtype or self.TYPE libtype = _libtype or libtype or self.TYPE
@ -1053,9 +1050,8 @@ class LibrarySection(PlexObject):
break break
else: else:
availableFields = [f.key for f in self.listFields(libtype)] availableFields = [f.key for f in self.listFields(libtype)]
raise NotFound('Unknown filter field "%s" for libtype "%s". ' raise NotFound(f'Unknown filter field "{field}" for libtype "{libtype}". '
'Available filter fields: %s' f'Available filter fields: {availableFields}') from None
% (field, libtype, availableFields)) from None
field = filterField.key field = filterField.key
operator = self._validateFieldOperator(filterField, operator) operator = self._validateFieldOperator(filterField, operator)
@ -1086,9 +1082,8 @@ class LibrarySection(PlexObject):
next(o for o in fieldType.operators if o.key == operator) next(o for o in fieldType.operators if o.key == operator)
except StopIteration: except StopIteration:
availableOperators = [o.key for o in self.listOperators(filterField.type)] availableOperators = [o.key for o in self.listOperators(filterField.type)]
raise NotFound('Unknown operator "%s" for filter field "%s". ' raise NotFound(f'Unknown operator "{operator}" for filter field "{filterField.key}". '
'Available operators: %s' f'Available operators: {availableOperators}') from None
% (operator, filterField.key, availableOperators)) from None
return '&=' if and_operator else operator return '&=' if and_operator else operator
@ -1116,8 +1111,8 @@ class LibrarySection(PlexObject):
value = self._validateFieldValueTag(value, filterField, libtype) value = self._validateFieldValueTag(value, filterField, libtype)
results.append(str(value)) results.append(str(value))
except (ValueError, AttributeError): except (ValueError, AttributeError):
raise BadRequest('Invalid value "%s" for filter field "%s", value should be type %s' raise BadRequest(f'Invalid value "{value}" for filter field "{filterField.key}", '
% (value, filterField.key, fieldType.type)) from None f'value should be type {fieldType.type}') from None
return results return results
@ -1170,11 +1165,11 @@ class LibrarySection(PlexObject):
Returns the validated sort field string. Returns the validated sort field string.
""" """
if isinstance(sort, FilteringSort): if isinstance(sort, FilteringSort):
return '%s.%s:%s' % (libtype or self.TYPE, sort.key, sort.defaultDirection) return f'{libtype or self.TYPE}.{sort.key}:{sort.defaultDirection}'
match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+):?([a-zA-Z]*)', sort.strip()) match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+):?([a-zA-Z]*)', sort.strip())
if not match: if not match:
raise BadRequest('Invalid filter sort: %s' % sort) raise BadRequest(f'Invalid filter sort: {sort}')
_libtype, sortField, sortDir = match.groups() _libtype, sortField, sortDir = match.groups()
libtype = _libtype or libtype or self.TYPE libtype = _libtype or libtype or self.TYPE
@ -1182,19 +1177,16 @@ class LibrarySection(PlexObject):
filterSort = next(f for f in self.listSorts(libtype) if f.key == sortField) filterSort = next(f for f in self.listSorts(libtype) if f.key == sortField)
except StopIteration: except StopIteration:
availableSorts = [f.key for f in self.listSorts(libtype)] availableSorts = [f.key for f in self.listSorts(libtype)]
raise NotFound('Unknown sort field "%s" for libtype "%s". ' raise NotFound(f'Unknown sort field "{sortField}" for libtype "{libtype}". '
'Available sort fields: %s' f'Available sort fields: {availableSorts}') from None
% (sortField, libtype, availableSorts)) from None
sortField = libtype + '.' + filterSort.key sortField = libtype + '.' + filterSort.key
availableDirections = ['', 'asc', 'desc', 'nullsLast'] availableDirections = ['', 'asc', 'desc', 'nullsLast']
if sortDir not in availableDirections: if sortDir not in availableDirections:
raise NotFound('Unknown sort direction "%s". ' raise NotFound(f'Unknown sort direction "{sortDir}". Available sort directions: {availableDirections}')
'Available sort directions: %s'
% (sortDir, availableDirections))
return '%s:%s' % (sortField, sortDir) if sortDir else sortField return f'{sortField}:{sortDir}' if sortDir else sortField
def _validateAdvancedSearch(self, filters, libtype): def _validateAdvancedSearch(self, filters, libtype):
""" Validates an advanced search filter dictionary. """ Validates an advanced search filter dictionary.
@ -1216,7 +1208,7 @@ class LibrarySection(PlexObject):
for value in values: for value in values:
validatedFilters.extend(self._validateAdvancedSearch(value, libtype)) validatedFilters.extend(self._validateAdvancedSearch(value, libtype))
validatedFilters.append('%s=1' % field.lower()) validatedFilters.append(f'{field.lower()}=1')
del validatedFilters[-1] del validatedFilters[-1]
validatedFilters.append('pop=1') validatedFilters.append('pop=1')
@ -1255,7 +1247,7 @@ class LibrarySection(PlexObject):
joined_args = utils.joinArgs(args).lstrip('?') joined_args = utils.joinArgs(args).lstrip('?')
joined_filter_args = '&'.join(filter_args) if filter_args else '' joined_filter_args = '&'.join(filter_args) if filter_args else ''
params = '&'.join([joined_args, joined_filter_args]).strip('&') params = '&'.join([joined_args, joined_filter_args]).strip('&')
key = '/library/sections/%s/all?%s' % (self.key, params) key = f'/library/sections/{self.key}/all?{params}'
if returnKwargs: if returnKwargs:
return key, kwargs return key, kwargs
@ -1631,7 +1623,7 @@ class LibrarySection(PlexObject):
key = self._buildSearchKey(title=title, sort=sort, libtype=libtype, **kwargs) key = self._buildSearchKey(title=title, sort=sort, libtype=libtype, **kwargs)
sync_item.location = 'library://%s/directory/%s' % (self.uuid, quote_plus(key)) sync_item.location = f'library://{self.uuid}/directory/{quote_plus(key)}'
sync_item.policy = policy sync_item.policy = policy
sync_item.mediaSettings = mediaSettings sync_item.mediaSettings = mediaSettings
@ -1666,7 +1658,7 @@ class LibrarySection(PlexObject):
try: try:
return self.collections(title=title, title__iexact=title)[0] return self.collections(title=title, title__iexact=title)[0]
except IndexError: except IndexError:
raise NotFound('Unable to find collection with title "%s".' % title) from None raise NotFound(f'Unable to find collection with title "{title}".') from None
def collections(self, **kwargs): def collections(self, **kwargs):
""" Returns a list of collections from this library section. """ Returns a list of collections from this library section.
@ -1695,7 +1687,7 @@ class LibrarySection(PlexObject):
try: try:
return self.playlists(title=title, title__iexact=title)[0] return self.playlists(title=title, title__iexact=title)[0]
except IndexError: except IndexError:
raise NotFound('Unable to find playlist with title "%s".' % title) from None raise NotFound(f'Unable to find playlist with title "{title}".') from None
def playlists(self, sort=None, **kwargs): def playlists(self, sort=None, **kwargs):
""" Returns a list of playlists from this library section. """ """ Returns a list of playlists from this library section. """
@ -1886,7 +1878,7 @@ class MusicSection(LibrarySection):
def albums(self): def albums(self):
""" Returns a list of :class:`~plexapi.audio.Album` objects in this section. """ """ Returns a list of :class:`~plexapi.audio.Album` objects in this section. """
key = '/library/sections/%s/albums' % self.key key = f'/library/sections/{self.key}/albums'
return self.fetchItems(key) return self.fetchItems(key)
def stations(self): def stations(self):
@ -2195,7 +2187,7 @@ class LibraryMediaTag(PlexObject):
def items(self, *args, **kwargs): def items(self, *args, **kwargs):
""" Return the list of items within this tag. """ """ Return the list of items within this tag. """
if not self.key: if not self.key:
raise BadRequest('Key is not defined for this tag: %s' % self.tag) raise BadRequest(f'Key is not defined for this tag: {self.tag}')
return self.fetchItems(self.key) return self.fetchItems(self.key)
@ -2568,7 +2560,7 @@ class FilteringType(PlexObject):
def __repr__(self): def __repr__(self):
_type = self._clean(self.firstAttr('type')) _type = self._clean(self.firstAttr('type'))
return '<%s>' % ':'.join([p for p in [self.__class__.__name__, _type] if p]) return f"<{':'.join([p for p in [self.__class__.__name__, _type] if p])}>"
def _loadData(self, data): def _loadData(self, data):
self._data = data self._data = data
@ -2619,12 +2611,13 @@ class FilteringType(PlexObject):
manualFilters = [] manualFilters = []
for filterTag, filterType, filterTitle in additionalFilters: for filterTag, filterType, filterTitle in additionalFilters:
filterKey = '/library/sections/%s/%s?type=%s' % ( filterKey = f'/library/sections/{self._librarySectionID}/{filterTag}?type={utils.searchType(self.type)}'
self._librarySectionID, filterTag, utils.searchType(self.type)
)
filterXML = ( filterXML = (
'<Filter filter="%s" filterType="%s" key="%s" title="%s" type="filter" />' f'<Filter filter="{filterTag}" '
% (filterTag, filterType, filterKey, filterTitle) f'filterType="{filterType}" '
f'key="{filterKey}" '
f'title="{filterTitle}" '
f'type="filter" />'
) )
manualFilters.append(self._manuallyLoadXML(filterXML, FilteringFilter)) manualFilters.append(self._manuallyLoadXML(filterXML, FilteringFilter))
@ -2638,7 +2631,7 @@ class FilteringType(PlexObject):
additionalSorts = [ additionalSorts = [
('guid', 'asc', 'Guid'), ('guid', 'asc', 'Guid'),
('id', 'asc', 'Rating Key'), ('id', 'asc', 'Rating Key'),
('index', 'asc', '%s Number' % self.type.capitalize()), ('index', 'asc', f'{self.type.capitalize()} Number'),
('summary', 'asc', 'Summary'), ('summary', 'asc', 'Summary'),
('tagline', 'asc', 'Tagline'), ('tagline', 'asc', 'Tagline'),
('updatedAt', 'asc', 'Date Updated') ('updatedAt', 'asc', 'Date Updated')
@ -2665,8 +2658,10 @@ class FilteringType(PlexObject):
manualSorts = [] manualSorts = []
for sortField, sortDir, sortTitle in additionalSorts: for sortField, sortDir, sortTitle in additionalSorts:
sortXML = ( sortXML = (
'<Sort defaultDirection="%s" descKey="%s:desc" key="%s" title="%s" />' f'<Sort defaultDirection="{sortDir}" '
% (sortDir, sortField, sortField, sortTitle) f'descKey="{sortField}:desc" '
f'key="{sortField}" '
f'title="{sortTitle}" />'
) )
manualSorts.append(self._manuallyLoadXML(sortXML, FilteringSort)) manualSorts.append(self._manuallyLoadXML(sortXML, FilteringSort))
@ -2680,8 +2675,8 @@ class FilteringType(PlexObject):
additionalFields = [ additionalFields = [
('guid', 'string', 'Guid'), ('guid', 'string', 'Guid'),
('id', 'integer', 'Rating Key'), ('id', 'integer', 'Rating Key'),
('index', 'integer', '%s Number' % self.type.capitalize()), ('index', 'integer', f'{self.type.capitalize()} Number'),
('lastRatedAt', 'date', '%s Last Rated' % self.type.capitalize()), ('lastRatedAt', 'date', f'{self.type.capitalize()} Last Rated'),
('updatedAt', 'date', 'Date Updated') ('updatedAt', 'date', 'Date Updated')
] ]
@ -2734,8 +2729,9 @@ class FilteringType(PlexObject):
manualFields = [] manualFields = []
for field, fieldType, fieldTitle in additionalFields: for field, fieldType, fieldTitle in additionalFields:
fieldXML = ( fieldXML = (
'<Field key="%s%s" title="%s" type="%s"/>' f'<Field key="{prefix}{field}" '
% (prefix, field, fieldTitle, fieldType) f'title="{fieldTitle}" '
f'type="{fieldType}"/>'
) )
manualFields.append(self._manuallyLoadXML(fieldXML, FilteringField)) manualFields.append(self._manuallyLoadXML(fieldXML, FilteringField))
@ -2826,7 +2822,7 @@ class FilteringFieldType(PlexObject):
def __repr__(self): def __repr__(self):
_type = self._clean(self.firstAttr('type')) _type = self._clean(self.firstAttr('type'))
return '<%s>' % ':'.join([p for p in [self.__class__.__name__, _type] if p]) return f"<{':'.join([p for p in [self.__class__.__name__, _type] if p])}>"
def _loadData(self, data): def _loadData(self, data):
""" Load attribute values from Plex XML response. """ """ Load attribute values from Plex XML response. """

View file

@ -91,12 +91,11 @@ class Media(PlexObject):
return self.proxyType == utils.SEARCHTYPES['optimizedVersion'] return self.proxyType == utils.SEARCHTYPES['optimizedVersion']
def delete(self): def delete(self):
part = '%s/media/%s' % (self._parentKey, self.id) part = f'{self._parentKey}/media/{self.id}'
try: try:
return self._server.query(part, method=self._server._session.delete) return self._server.query(part, method=self._server._session.delete)
except BadRequest: except BadRequest:
log.error("Failed to delete %s. This could be because you haven't allowed " log.error(f"Failed to delete {part}. This could be because you haven't allowed items to be deleted")
"items to be deleted" % part)
raise raise
@ -192,9 +191,9 @@ class MediaPart(PlexObject):
stream (:class:`~plexapi.media.AudioStream`): AudioStream to set as default stream (:class:`~plexapi.media.AudioStream`): AudioStream to set as default
""" """
if isinstance(stream, AudioStream): if isinstance(stream, AudioStream):
key = "/library/parts/%d?audioStreamID=%d&allParts=1" % (self.id, stream.id) key = f"/library/parts/{self.id}?audioStreamID={stream.id}&allParts=1"
else: else:
key = "/library/parts/%d?audioStreamID=%d&allParts=1" % (self.id, stream) key = f"/library/parts/{self.id}?audioStreamID={stream}&allParts=1"
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -205,15 +204,15 @@ class MediaPart(PlexObject):
stream (:class:`~plexapi.media.SubtitleStream`): SubtitleStream to set as default. stream (:class:`~plexapi.media.SubtitleStream`): SubtitleStream to set as default.
""" """
if isinstance(stream, SubtitleStream): if isinstance(stream, SubtitleStream):
key = "/library/parts/%d?subtitleStreamID=%d&allParts=1" % (self.id, stream.id) key = f"/library/parts/{self.id}?subtitleStreamID={stream.id}&allParts=1"
else: else:
key = "/library/parts/%d?subtitleStreamID=%d&allParts=1" % (self.id, stream) key = f"/library/parts/{self.id}?subtitleStreamID={stream}&allParts=1"
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
def resetDefaultSubtitleStream(self): def resetDefaultSubtitleStream(self):
""" Set default subtitle of this MediaPart to 'none'. """ """ Set default subtitle of this MediaPart to 'none'. """
key = "/library/parts/%d?subtitleStreamID=0&allParts=1" % (self.id) key = f"/library/parts/{self.id}?subtitleStreamID=0&allParts=1"
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -581,22 +580,22 @@ class Optimized(PlexObject):
""" Returns a list of all :class:`~plexapi.media.Video` objects """ Returns a list of all :class:`~plexapi.media.Video` objects
in this optimized item. in this optimized item.
""" """
key = '%s/%s/items' % (self._initpath, self.id) key = f'{self._initpath}/{self.id}/items'
return self.fetchItems(key) return self.fetchItems(key)
def remove(self): def remove(self):
""" Remove an Optimized item""" """ Remove an Optimized item"""
key = '%s/%s' % (self._initpath, self.id) key = f'{self._initpath}/{self.id}'
self._server.query(key, method=self._server._session.delete) self._server.query(key, method=self._server._session.delete)
def rename(self, title): def rename(self, title):
""" Rename an Optimized item""" """ Rename an Optimized item"""
key = '%s/%s?Item[title]=%s' % (self._initpath, self.id, title) key = f'{self._initpath}/{self.id}?Item[title]={title}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
def reprocess(self, ratingKey): def reprocess(self, ratingKey):
""" Reprocess a removed Conversion item that is still a listed Optimize item""" """ Reprocess a removed Conversion item that is still a listed Optimize item"""
key = '%s/%s/%s/enable' % (self._initpath, self.id, ratingKey) key = f'{self._initpath}/{self.id}/{ratingKey}/enable'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
@ -642,7 +641,7 @@ class Conversion(PlexObject):
def remove(self): def remove(self):
""" Remove Conversion from queue """ """ Remove Conversion from queue """
key = '/playlists/%s/items/%s/%s/disable' % (self.playlistID, self.generatorID, self.ratingKey) key = f'/playlists/{self.playlistID}/items/{self.generatorID}/{self.ratingKey}/disable'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
def move(self, after): def move(self, after):
@ -657,7 +656,7 @@ class Conversion(PlexObject):
conversions[3].move(conversions[1].playQueueItemID) conversions[3].move(conversions[1].playQueueItemID)
""" """
key = '%s/items/%s/move?after=%s' % (self._initpath, self.playQueueItemID, after) key = f'{self._initpath}/items/{self.playQueueItemID}/move?after={after}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
@ -697,14 +696,12 @@ class MediaTag(PlexObject):
self._parentType = parent.TYPE self._parentType = parent.TYPE
if self._librarySectionKey and self.filter: if self._librarySectionKey and self.filter:
self.key = '%s/all?%s&type=%s' % ( self.key = f'{self._librarySectionKey}/all?{self.filter}&type={utils.searchType(self._parentType)}'
self._librarySectionKey, self.filter, utils.searchType(self._parentType))
def items(self): def items(self):
""" Return the list of items within this tag. """ """ Return the list of items within this tag. """
if not self.key: if not self.key:
raise BadRequest('Key is not defined for this tag: %s. ' raise BadRequest(f'Key is not defined for this tag: {self.tag}. Reload the parent object.')
'Reload the parent object.' % self.tag)
return self.fetchItems(self.key) return self.fetchItems(self.key)
@ -722,7 +719,7 @@ class Collection(MediaTag):
def collection(self): def collection(self):
""" Return the :class:`~plexapi.collection.Collection` object for this collection tag. """ Return the :class:`~plexapi.collection.Collection` object for this collection tag.
""" """
key = '%s/collections' % self._librarySectionKey key = f'{self._librarySectionKey}/collections'
return self.fetchItem(key, etag='Directory', index=self.id) return self.fetchItem(key, etag='Directory', index=self.id)
@ -953,7 +950,7 @@ class BaseResource(PlexObject):
def select(self): def select(self):
key = self._initpath[:-1] key = self._initpath[:-1]
data = '%s?url=%s' % (key, quote_plus(self.ratingKey)) data = f'{key}?url={quote_plus(self.ratingKey)}'
try: try:
self._server.query(data, method=self._server._session.put) self._server.query(data, method=self._server._session.put)
except xml.etree.ElementTree.ParseError: except xml.etree.ElementTree.ParseError:
@ -1013,8 +1010,8 @@ class Marker(PlexObject):
name = self._clean(self.firstAttr('type')) name = self._clean(self.firstAttr('type'))
start = utils.millisecondToHumanstr(self._clean(self.firstAttr('start'))) start = utils.millisecondToHumanstr(self._clean(self.firstAttr('start')))
end = utils.millisecondToHumanstr(self._clean(self.firstAttr('end'))) end = utils.millisecondToHumanstr(self._clean(self.firstAttr('end')))
offsets = '%s-%s' % (start, end) offsets = f'{start}-{end}'
return '<%s>' % ':'.join([self.__class__.__name__, name, offsets]) return f"<{':'.join([self.__class__.__name__, name, offsets])}>"
def _loadData(self, data): def _loadData(self, data):
self._data = data self._data = data
@ -1051,7 +1048,7 @@ class SearchResult(PlexObject):
def __repr__(self): def __repr__(self):
name = self._clean(self.firstAttr('name')) name = self._clean(self.firstAttr('name'))
score = self._clean(self.firstAttr('score')) score = self._clean(self.firstAttr('score'))
return '<%s>' % ':'.join([p for p in [self.__class__.__name__, name, score] if p]) return f"<{':'.join([p for p in [self.__class__.__name__, name, score] if p])}>"
def _loadData(self, data): def _loadData(self, data):
self._data = data self._data = data
@ -1073,7 +1070,7 @@ class Agent(PlexObject):
def __repr__(self): def __repr__(self):
uid = self._clean(self.firstAttr('shortIdentifier')) uid = self._clean(self.firstAttr('shortIdentifier'))
return '<%s>' % ':'.join([p for p in [self.__class__.__name__, uid] if p]) return f"<{':'.join([p for p in [self.__class__.__name__, uid] if p])}>"
def _loadData(self, data): def _loadData(self, data):
self._data = data self._data = data
@ -1097,7 +1094,7 @@ class Agent(PlexObject):
return self.languageCodes return self.languageCodes
def settings(self): def settings(self):
key = '/:/plugins/%s/prefs' % self.identifier key = f'/:/plugins/{self.identifier}/prefs'
data = self._server.query(key) data = self._server.query(key)
return self.findItems(data, cls=settings.Setting) return self.findItems(data, cls=settings.Setting)
@ -1116,7 +1113,7 @@ class AgentMediaType(Agent):
def __repr__(self): def __repr__(self):
uid = self._clean(self.firstAttr('name')) uid = self._clean(self.firstAttr('name'))
return '<%s>' % ':'.join([p for p in [self.__class__.__name__, uid] if p]) return f"<{':'.join([p for p in [self.__class__.__name__, uid] if p])}>"
def _loadData(self, data): def _loadData(self, data):
self.languageCodes = self.listAttrs(data, 'code', etag='Language') self.languageCodes = self.listAttrs(data, 'code', etag='Language')

View file

@ -27,26 +27,25 @@ class AdvancedSettingsMixin:
return next(p for p in prefs if p.id == pref) return next(p for p in prefs if p.id == pref)
except StopIteration: except StopIteration:
availablePrefs = [p.id for p in prefs] availablePrefs = [p.id for p in prefs]
raise NotFound('Unknown preference "%s" for %s. ' raise NotFound(f'Unknown preference "{pref}" for {self.TYPE}. '
'Available preferences: %s' f'Available preferences: {availablePrefs}') from None
% (pref, self.TYPE, availablePrefs)) from None
def editAdvanced(self, **kwargs): def editAdvanced(self, **kwargs):
""" Edit a Plex object's advanced settings. """ """ Edit a Plex object's advanced settings. """
data = {} data = {}
key = '%s/prefs?' % self.key key = f'{self.key}/prefs?'
preferences = {pref.id: pref for pref in self.preferences() if pref.enumValues} preferences = {pref.id: pref for pref in self.preferences() if pref.enumValues}
for settingID, value in kwargs.items(): for settingID, value in kwargs.items():
try: try:
pref = preferences[settingID] pref = preferences[settingID]
except KeyError: except KeyError:
raise NotFound('%s not found in %s' % (value, list(preferences.keys()))) raise NotFound(f'{value} not found in {list(preferences.keys())}')
enumValues = pref.enumValues enumValues = pref.enumValues
if enumValues.get(value, enumValues.get(str(value))): if enumValues.get(value, enumValues.get(str(value))):
data[settingID] = value data[settingID] = value
else: else:
raise NotFound('%s not found in %s' % (value, list(enumValues))) raise NotFound(f'{value} not found in {list(enumValues)}')
url = key + urlencode(data) url = key + urlencode(data)
self._server.query(url, method=self._server._session.put) self._server.query(url, method=self._server._session.put)
return self return self
@ -54,7 +53,7 @@ class AdvancedSettingsMixin:
def defaultAdvanced(self): def defaultAdvanced(self):
""" Edit all of a Plex object's advanced settings to default. """ """ Edit all of a Plex object's advanced settings to default. """
data = {} data = {}
key = '%s/prefs?' % self.key key = f'{self.key}/prefs?'
for preference in self.preferences(): for preference in self.preferences():
data[preference.id] = preference.default data[preference.id] = preference.default
url = key + urlencode(data) url = key + urlencode(data)
@ -140,7 +139,7 @@ class SplitMergeMixin:
if not isinstance(ratingKeys, list): if not isinstance(ratingKeys, list):
ratingKeys = str(ratingKeys).split(',') ratingKeys = str(ratingKeys).split(',')
key = '%s/merge?ids=%s' % (self.key, ','.join([str(r) for r in ratingKeys])) key = f"{self.key}/merge?ids={','.join([str(r) for r in ratingKeys])}"
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -226,7 +225,7 @@ class UnmatchMatchMixin:
if autoMatch: if autoMatch:
searchResult = autoMatch[0] searchResult = autoMatch[0]
else: else:
raise NotFound('No matches found using this agent: (%s:%s)' % (agent, autoMatch)) raise NotFound(f'No matches found using this agent: ({agent}:{autoMatch})')
elif not searchResult: elif not searchResult:
raise NotFound('fixMatch() requires either auto=True or ' raise NotFound('fixMatch() requires either auto=True or '
'searchResult=:class:`~plexapi.media.SearchResult`.') 'searchResult=:class:`~plexapi.media.SearchResult`.')
@ -315,7 +314,7 @@ class RatingMixin:
rating = -1 rating = -1
elif not isinstance(rating, (int, float)) or rating < 0 or rating > 10: elif not isinstance(rating, (int, float)) or rating < 0 or rating > 10:
raise BadRequest('Rating must be between 0 to 10.') raise BadRequest('Rating must be between 0 to 10.')
key = '/:/rate?key=%s&identifier=com.plexapp.plugins.library&rating=%s' % (self.ratingKey, rating) key = f'/:/rate?key={self.ratingKey}&identifier=com.plexapp.plugins.library&rating={rating}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self

View file

@ -153,7 +153,7 @@ class MyPlexAccount(PlexObject):
for device in self.devices(): for device in self.devices():
if (name and device.name.lower() == name.lower() or device.clientIdentifier == clientId): if (name and device.name.lower() == name.lower() or device.clientIdentifier == clientId):
return device return device
raise NotFound('Unable to find device %s' % name) raise NotFound(f'Unable to find device {name}')
def devices(self): def devices(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexDevice` objects connected to the server. """ """ Returns a list of all :class:`~plexapi.myplex.MyPlexDevice` objects connected to the server. """
@ -177,7 +177,7 @@ class MyPlexAccount(PlexObject):
if response.status_code not in (200, 201, 204): # pragma: no cover if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0] codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ') errtext = response.text.replace('\n', ' ')
message = '(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext) message = f'({response.status_code}) {codename}; {response.url} {errtext}'
if response.status_code == 401: if response.status_code == 401:
raise Unauthorized(message) raise Unauthorized(message)
elif response.status_code == 404: elif response.status_code == 404:
@ -198,7 +198,7 @@ class MyPlexAccount(PlexObject):
for resource in self.resources(): for resource in self.resources():
if resource.name.lower() == name.lower(): if resource.name.lower() == name.lower():
return resource return resource
raise NotFound('Unable to find resource %s' % name) raise NotFound(f'Unable to find resource {name}')
def resources(self): def resources(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexResource` objects connected to the server. """ """ Returns a list of all :class:`~plexapi.myplex.MyPlexResource` objects connected to the server. """
@ -424,7 +424,7 @@ class MyPlexAccount(PlexObject):
'home': int(invite.home), 'home': int(invite.home),
'server': int(invite.server) 'server': int(invite.server)
} }
url = MyPlexInvite.REQUESTS + '/%s' % invite.id + utils.joinArgs(params) url = MyPlexInvite.REQUESTS + f'/{invite.id}' + utils.joinArgs(params)
return self.query(url, self._session.put) return self.query(url, self._session.put)
def cancelInvite(self, user): def cancelInvite(self, user):
@ -440,7 +440,7 @@ class MyPlexAccount(PlexObject):
'home': int(invite.home), 'home': int(invite.home),
'server': int(invite.server) 'server': int(invite.server)
} }
url = MyPlexInvite.REQUESTED + '/%s' % invite.id + utils.joinArgs(params) url = MyPlexInvite.REQUESTED + f'/{invite.id}' + utils.joinArgs(params)
return self.query(url, self._session.delete) return self.query(url, self._session.delete)
def updateFriend(self, user, server, sections=None, removeSections=False, allowSync=None, allowCameraUpload=None, def updateFriend(self, user, server, sections=None, removeSections=False, allowSync=None, allowCameraUpload=None,
@ -528,7 +528,7 @@ class MyPlexAccount(PlexObject):
(user.username.lower(), user.email.lower(), str(user.id))): (user.username.lower(), user.email.lower(), str(user.id))):
return user return user
raise NotFound('Unable to find user %s' % username) raise NotFound(f'Unable to find user {username}')
def users(self): def users(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexUser` objects connected to your account. """ Returns a list of all :class:`~plexapi.myplex.MyPlexUser` objects connected to your account.
@ -551,7 +551,7 @@ class MyPlexAccount(PlexObject):
(invite.username.lower(), invite.email.lower(), str(invite.id))): (invite.username.lower(), invite.email.lower(), str(invite.id))):
return invite return invite
raise NotFound('Unable to find invite %s' % username) raise NotFound(f'Unable to find invite {username}')
def pendingInvites(self, includeSent=True, includeReceived=True): def pendingInvites(self, includeSent=True, includeReceived=True):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexInvite` objects connected to your account. """ Returns a list of all :class:`~plexapi.myplex.MyPlexInvite` objects connected to your account.
@ -576,7 +576,7 @@ class MyPlexAccount(PlexObject):
# Get a list of all section ids for looking up each section. # Get a list of all section ids for looking up each section.
allSectionIds = {} allSectionIds = {}
machineIdentifier = server.machineIdentifier if isinstance(server, PlexServer) else server machineIdentifier = server.machineIdentifier if isinstance(server, PlexServer) else server
url = self.PLEXSERVERS.replace('{machineId}', machineIdentifier) url = self.PLEXSERVERS.format(machineId=machineIdentifier)
data = self.query(url, self._session.get) data = self.query(url, self._session.get)
for elem in data[0]: for elem in data[0]:
_id = utils.cast(int, elem.attrib.get('id')) _id = utils.cast(int, elem.attrib.get('id'))
@ -599,7 +599,7 @@ class MyPlexAccount(PlexObject):
for key, vals in filterDict.items(): for key, vals in filterDict.items():
if key not in ('contentRating', 'label', 'contentRating!', 'label!'): if key not in ('contentRating', 'label', 'contentRating!', 'label!'):
raise BadRequest('Unknown filter key: %s', key) raise BadRequest('Unknown filter key: %s', key)
values.append('%s=%s' % (key, '%2C'.join(vals))) values.append(f"{key}={'%2C'.join(vals)}")
return '|'.join(values) return '|'.join(values)
def addWebhook(self, url): def addWebhook(self, url):
@ -610,12 +610,12 @@ class MyPlexAccount(PlexObject):
def deleteWebhook(self, url): def deleteWebhook(self, url):
urls = copy.copy(self._webhooks) urls = copy.copy(self._webhooks)
if url not in urls: if url not in urls:
raise BadRequest('Webhook does not exist: %s' % url) raise BadRequest(f'Webhook does not exist: {url}')
urls.remove(url) urls.remove(url)
return self.setWebhooks(urls) return self.setWebhooks(urls)
def setWebhooks(self, urls): def setWebhooks(self, urls):
log.info('Setting webhooks: %s' % urls) log.info(f'Setting webhooks: {urls}')
data = {'urls[]': urls} if len(urls) else {'urls': ''} data = {'urls[]': urls} if len(urls) else {'urls': ''}
data = self.query(self.WEBHOOKS, self._session.post, data=data) data = self.query(self.WEBHOOKS, self._session.post, data=data)
self._webhooks = self.listAttrs(data, 'url', etag='webhook') self._webhooks = self.listAttrs(data, 'url', etag='webhook')
@ -725,7 +725,7 @@ class MyPlexAccount(PlexObject):
if response.status_code not in (200, 201, 204): # pragma: no cover if response.status_code not in (200, 201, 204): # pragma: no cover
codename = codes.get(response.status_code)[0] codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ') errtext = response.text.replace('\n', ' ')
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext)) raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}')
return response.json()['token'] return response.json()['token']
def history(self, maxresults=9999999, mindate=None): def history(self, maxresults=9999999, mindate=None):
@ -839,7 +839,7 @@ class MyPlexAccount(PlexObject):
for item in items: for item in items:
if self.onWatchlist(item): if self.onWatchlist(item):
raise BadRequest('"%s" is already on the watchlist' % item.title) raise BadRequest(f'"{item.title}" is already on the watchlist')
ratingKey = item.guid.rsplit('/', 1)[-1] ratingKey = item.guid.rsplit('/', 1)[-1]
self.query(f'{self.METADATA}/actions/addToWatchlist?ratingKey={ratingKey}', method=self._session.put) self.query(f'{self.METADATA}/actions/addToWatchlist?ratingKey={ratingKey}', method=self._session.put)
return self return self
@ -860,7 +860,7 @@ class MyPlexAccount(PlexObject):
for item in items: for item in items:
if not self.onWatchlist(item): if not self.onWatchlist(item):
raise BadRequest('"%s" is not on the watchlist' % item.title) raise BadRequest(f'"{item.title}" is not on the watchlist')
ratingKey = item.guid.rsplit('/', 1)[-1] ratingKey = item.guid.rsplit('/', 1)[-1]
self.query(f'{self.METADATA}/actions/removeFromWatchlist?ratingKey={ratingKey}', method=self._session.put) self.query(f'{self.METADATA}/actions/removeFromWatchlist?ratingKey={ratingKey}', method=self._session.put)
return self return self
@ -1037,7 +1037,7 @@ class MyPlexUser(PlexObject):
if utils.cast(int, item.attrib.get('userID')) == self.id: if utils.cast(int, item.attrib.get('userID')) == self.id:
return item.attrib.get('accessToken') return item.attrib.get('accessToken')
except Exception: except Exception:
log.exception('Failed to get access token for %s' % self.title) log.exception(f'Failed to get access token for {self.title}')
def server(self, name): def server(self, name):
""" Returns the :class:`~plexapi.myplex.MyPlexServerShare` that matches the name specified. """ Returns the :class:`~plexapi.myplex.MyPlexServerShare` that matches the name specified.
@ -1049,7 +1049,7 @@ class MyPlexUser(PlexObject):
if name.lower() == server.name.lower(): if name.lower() == server.name.lower():
return server return server
raise NotFound('Unable to find server %s' % name) raise NotFound(f'Unable to find server {name}')
def history(self, maxresults=9999999, mindate=None): def history(self, maxresults=9999999, mindate=None):
""" Get all Play History for a user in all shared servers. """ Get all Play History for a user in all shared servers.
@ -1177,7 +1177,7 @@ class MyPlexServerShare(PlexObject):
if name.lower() == section.title.lower(): if name.lower() == section.title.lower():
return section return section
raise NotFound('Unable to find section %s' % name) raise NotFound(f'Unable to find section {name}')
def sections(self): def sections(self):
""" Returns a list of all :class:`~plexapi.myplex.Section` objects shared with this user. """ Returns a list of all :class:`~plexapi.myplex.Section` objects shared with this user.
@ -1342,7 +1342,7 @@ class ResourceConnection(PlexObject):
self.port = utils.cast(int, data.attrib.get('port')) self.port = utils.cast(int, data.attrib.get('port'))
self.uri = data.attrib.get('uri') self.uri = data.attrib.get('uri')
self.local = utils.cast(bool, data.attrib.get('local')) self.local = utils.cast(bool, data.attrib.get('local'))
self.httpuri = 'http://%s:%s' % (self.address, self.port) self.httpuri = f'http://{self.address}:{self.port}'
self.relay = utils.cast(bool, data.attrib.get('relay')) self.relay = utils.cast(bool, data.attrib.get('relay'))
@ -1416,7 +1416,7 @@ class MyPlexDevice(PlexObject):
def delete(self): def delete(self):
""" Remove this device from your account. """ """ Remove this device from your account. """
key = 'https://plex.tv/devices/%s.xml' % self.id key = f'https://plex.tv/devices/{self.id}.xml'
self._server.query(key, self._server._session.delete) self._server.query(key, self._server._session.delete)
def syncItems(self): def syncItems(self):
@ -1654,7 +1654,7 @@ class MyPlexPinLogin:
if not response.ok: # pragma: no cover if not response.ok: # pragma: no cover
codename = codes.get(response.status_code)[0] codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ') errtext = response.text.replace('\n', ' ')
raise BadRequest('(%s) %s %s; %s' % (response.status_code, codename, response.url, errtext)) raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}')
data = response.text.encode('utf8') data = response.text.encode('utf8')
return ElementTree.fromstring(data) if data.strip() else None return ElementTree.fromstring(data) if data.strip() else None
@ -1698,7 +1698,7 @@ def _chooseConnection(ctype, name, results):
if results: if results:
log.debug('Connecting to %s: %s?X-Plex-Token=%s', ctype, results[0]._baseurl, results[0]._token) log.debug('Connecting to %s: %s?X-Plex-Token=%s', ctype, results[0]._baseurl, results[0]._token)
return results[0] return results[0]
raise NotFound('Unable to connect to %s: %s' % (ctype.lower(), name)) raise NotFound(f'Unable to connect to {ctype.lower()}: {name}')
class AccountOptOut(PlexObject): class AccountOptOut(PlexObject):
@ -1727,8 +1727,8 @@ class AccountOptOut(PlexObject):
:exc:`~plexapi.exceptions.NotFound`: ``option`` str not found in CHOICES. :exc:`~plexapi.exceptions.NotFound`: ``option`` str not found in CHOICES.
""" """
if option not in self.CHOICES: if option not in self.CHOICES:
raise NotFound('%s not found in available choices: %s' % (option, self.CHOICES)) raise NotFound(f'{option} not found in available choices: {self.CHOICES}')
url = self._server.OPTOUTS % {'userUUID': self._server.uuid} url = self._server.OPTOUTS.format(userUUID=self._server.uuid)
params = {'key': self.key, 'value': option} params = {'key': self.key, 'value': option}
self._server.query(url, method=self._server._session.post, params=params) self._server.query(url, method=self._server._session.post, params=params)
self.value = option # assume query successful and set the value to option self.value = option # assume query successful and set the value to option
@ -1748,7 +1748,7 @@ class AccountOptOut(PlexObject):
:exc:`~plexapi.exceptions.BadRequest`: When trying to opt out music. :exc:`~plexapi.exceptions.BadRequest`: When trying to opt out music.
""" """
if self.key == 'tv.plex.provider.music': if self.key == 'tv.plex.provider.music':
raise BadRequest('%s does not have the option to opt out managed users.' % self.key) raise BadRequest(f'{self.key} does not have the option to opt out managed users.')
self._updateOptOut('opt_out_managed') self._updateOptOut('opt_out_managed')

View file

@ -226,7 +226,7 @@ class Photo(
def _prettyfilename(self): def _prettyfilename(self):
""" Returns a filename for use in download. """ """ Returns a filename for use in download. """
if self.parentTitle: if self.parentTitle:
return '%s - %s' % (self.parentTitle, self.title) return f'{self.parentTitle} - {self.title}'
return self.title return self.title
def photoalbum(self): def photoalbum(self):
@ -282,7 +282,7 @@ class Photo(
section = self.section() section = self.section()
sync_item.location = 'library://%s/item/%s' % (section.uuid, quote_plus(self.key)) sync_item.location = f'library://{section.uuid}/item/{quote_plus(self.key)}'
sync_item.policy = Policy.create(limit) sync_item.policy = Policy.create(limit)
sync_item.mediaSettings = MediaSettings.createPhoto(resolution) sync_item.mediaSettings = MediaSettings.createPhoto(resolution)

View file

@ -127,7 +127,7 @@ class Playlist(
for _item in self.items(): for _item in self.items():
if _item.ratingKey == item.ratingKey: if _item.ratingKey == item.ratingKey:
return _item.playlistItemID return _item.playlistItemID
raise NotFound('Item with title "%s" not found in the playlist' % item.title) raise NotFound(f'Item with title "{item.title}" not found in the playlist')
def filters(self): def filters(self):
""" Returns the search filter dict for smart playlist. """ Returns the search filter dict for smart playlist.
@ -177,14 +177,14 @@ class Playlist(
for item in self.items(): for item in self.items():
if item.title.lower() == title.lower(): if item.title.lower() == title.lower():
return item return item
raise NotFound('Item with title "%s" not found in the playlist' % title) raise NotFound(f'Item with title "{title}" not found in the playlist')
def items(self): def items(self):
""" Returns a list of all items in the playlist. """ """ Returns a list of all items in the playlist. """
if self.radio: if self.radio:
return [] return []
if self._items is None: if self._items is None:
key = '%s/items' % self.key key = f'{self.key}/items'
items = self.fetchItems(key) items = self.fetchItems(key)
self._items = items self._items = items
return self._items return self._items
@ -212,16 +212,15 @@ class Playlist(
ratingKeys = [] ratingKeys = []
for item in items: for item in items:
if item.listType != self.playlistType: # pragma: no cover if item.listType != self.playlistType: # pragma: no cover
raise BadRequest('Can not mix media types when building a playlist: %s and %s' % raise BadRequest(f'Can not mix media types when building a playlist: '
(self.playlistType, item.listType)) f'{self.playlistType} and {item.listType}')
ratingKeys.append(str(item.ratingKey)) ratingKeys.append(str(item.ratingKey))
ratingKeys = ','.join(ratingKeys) ratingKeys = ','.join(ratingKeys)
uri = '%s/library/metadata/%s' % (self._server._uriRoot(), ratingKeys) uri = f'{self._server._uriRoot()}/library/metadata/{ratingKeys}'
key = '%s/items%s' % (self.key, utils.joinArgs({ args = {'uri': uri}
'uri': uri key = f"{self.key}/items{utils.joinArgs(args)}"
}))
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -248,7 +247,7 @@ class Playlist(
for item in items: for item in items:
playlistItemID = self._getPlaylistItemID(item) playlistItemID = self._getPlaylistItemID(item)
key = '%s/items/%s' % (self.key, playlistItemID) key = f'{self.key}/items/{playlistItemID}'
self._server.query(key, method=self._server._session.delete) self._server.query(key, method=self._server._session.delete)
return self return self
@ -269,11 +268,11 @@ class Playlist(
raise BadRequest('Cannot move items in a smart playlist.') raise BadRequest('Cannot move items in a smart playlist.')
playlistItemID = self._getPlaylistItemID(item) playlistItemID = self._getPlaylistItemID(item)
key = '%s/items/%s/move' % (self.key, playlistItemID) key = f'{self.key}/items/{playlistItemID}/move'
if after: if after:
afterPlaylistItemID = self._getPlaylistItemID(after) afterPlaylistItemID = self._getPlaylistItemID(after)
key += '?after=%s' % afterPlaylistItemID key += f'?after={afterPlaylistItemID}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -300,17 +299,16 @@ class Playlist(
section = self.section() section = self.section()
searchKey = section._buildSearchKey( searchKey = section._buildSearchKey(
sort=sort, libtype=section.METADATA_TYPE, limit=limit, filters=filters, **kwargs) sort=sort, libtype=section.METADATA_TYPE, limit=limit, filters=filters, **kwargs)
uri = '%s%s' % (self._server._uriRoot(), searchKey) uri = f'{self._server._uriRoot()}{searchKey}'
key = '%s/items%s' % (self.key, utils.joinArgs({ args = {'uri': uri}
'uri': uri key = f"{self.key}/items{utils.joinArgs(args)}"
}))
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
def _edit(self, **kwargs): def _edit(self, **kwargs):
""" Actually edit the playlist. """ """ Actually edit the playlist. """
key = '%s%s' % (self.key, utils.joinArgs(kwargs)) key = f'{self.key}{utils.joinArgs(kwargs)}'
self._server.query(key, method=self._server._session.put) self._server.query(key, method=self._server._session.put)
return self return self
@ -353,14 +351,10 @@ class Playlist(
ratingKeys.append(str(item.ratingKey)) ratingKeys.append(str(item.ratingKey))
ratingKeys = ','.join(ratingKeys) ratingKeys = ','.join(ratingKeys)
uri = '%s/library/metadata/%s' % (server._uriRoot(), ratingKeys) uri = f'{server._uriRoot()}/library/metadata/{ratingKeys}'
key = '/playlists%s' % utils.joinArgs({ args = {'uri': uri, 'type': listType, 'title': title, 'smart': 0}
'uri': uri, key = f"/playlists{utils.joinArgs(args)}"
'type': listType,
'title': title,
'smart': 0
})
data = server.query(key, method=server._session.post)[0] data = server.query(key, method=server._session.post)[0]
return cls(server, data, initpath=key) return cls(server, data, initpath=key)
@ -374,14 +368,10 @@ class Playlist(
searchKey = section._buildSearchKey( searchKey = section._buildSearchKey(
sort=sort, libtype=libtype, limit=limit, filters=filters, **kwargs) sort=sort, libtype=libtype, limit=limit, filters=filters, **kwargs)
uri = '%s%s' % (server._uriRoot(), searchKey) uri = f'{server._uriRoot()}{searchKey}'
key = '/playlists%s' % utils.joinArgs({ args = {'uri': uri, 'type': section.CONTENT_TYPE, 'title': title, 'smart': 1}
'uri': uri, key = f"/playlists{utils.joinArgs(args)}"
'type': section.CONTENT_TYPE,
'title': title,
'smart': 1,
})
data = server.query(key, method=server._session.post)[0] data = server.query(key, method=server._session.post)[0]
return cls(server, data, initpath=key) return cls(server, data, initpath=key)
@ -470,7 +460,7 @@ class Playlist(
sync_item.metadataType = self.metadataType sync_item.metadataType = self.metadataType
sync_item.machineIdentifier = self._server.machineIdentifier sync_item.machineIdentifier = self._server.machineIdentifier
sync_item.location = 'playlist:///%s' % quote_plus(self.guid) sync_item.location = f'playlist:///{quote_plus(self.guid)}'
sync_item.policy = Policy.create(limit, unwatched) sync_item.policy = Policy.create(limit, unwatched)
if self.isVideo: if self.isVideo:

View file

@ -90,10 +90,10 @@ class PlayQueue(PlexObject):
return matches[0] return matches[0]
elif len(matches) > 1: elif len(matches) > 1:
raise BadRequest( raise BadRequest(
"{item} occurs multiple times in this PlayQueue, provide exact item".format(item=item) f"{item} occurs multiple times in this PlayQueue, provide exact item"
) )
else: else:
raise BadRequest("{item} not valid for this PlayQueue".format(item=item)) raise BadRequest(f"{item} not valid for this PlayQueue")
@classmethod @classmethod
def get( def get(
@ -128,7 +128,7 @@ class PlayQueue(PlexObject):
if center: if center:
args["center"] = center args["center"] = center
path = "/playQueues/{playQueueID}{args}".format(playQueueID=playQueueID, args=utils.joinArgs(args)) path = f"/playQueues/{playQueueID}{utils.joinArgs(args)}"
data = server.query(path, method=server._session.get) data = server.query(path, method=server._session.get)
c = cls(server, data, initpath=path) c = cls(server, data, initpath=path)
c._server = server c._server = server
@ -171,8 +171,8 @@ class PlayQueue(PlexObject):
if isinstance(items, list): if isinstance(items, list):
item_keys = ",".join([str(x.ratingKey) for x in items]) item_keys = ",".join([str(x.ratingKey) for x in items])
uri_args = quote_plus("/library/metadata/{item_keys}".format(item_keys=item_keys)) uri_args = quote_plus(f"/library/metadata/{item_keys}")
args["uri"] = "library:///directory/{uri_args}".format(uri_args=uri_args) args["uri"] = f"library:///directory/{uri_args}"
args["type"] = items[0].listType args["type"] = items[0].listType
elif items.type == "playlist": elif items.type == "playlist":
args["type"] = items.playlistType args["type"] = items.playlistType
@ -183,12 +183,12 @@ class PlayQueue(PlexObject):
else: else:
uuid = items.section().uuid uuid = items.section().uuid
args["type"] = items.listType args["type"] = items.listType
args["uri"] = "library://{uuid}/item/{key}".format(uuid=uuid, key=items.key) args["uri"] = f"library://{uuid}/item/{items.key}"
if startItem: if startItem:
args["key"] = startItem.key args["key"] = startItem.key
path = "/playQueues{args}".format(args=utils.joinArgs(args)) path = f"/playQueues{utils.joinArgs(args)}"
data = server.query(path, method=server._session.post) data = server.query(path, method=server._session.post)
c = cls(server, data, initpath=path) c = cls(server, data, initpath=path)
c._server = server c._server = server
@ -250,12 +250,12 @@ class PlayQueue(PlexObject):
args["playlistID"] = item.ratingKey args["playlistID"] = item.ratingKey
else: else:
uuid = item.section().uuid uuid = item.section().uuid
args["uri"] = "library://{uuid}/item{key}".format(uuid=uuid, key=item.key) args["uri"] = f"library://{uuid}/item{item.key}"
if playNext: if playNext:
args["next"] = 1 args["next"] = 1
path = "/playQueues/{playQueueID}{args}".format(playQueueID=self.playQueueID, args=utils.joinArgs(args)) path = f"/playQueues/{self.playQueueID}{utils.joinArgs(args)}"
data = self._server.query(path, method=self._server._session.put) data = self._server.query(path, method=self._server._session.put)
self._loadData(data) self._loadData(data)
return self return self
@ -284,9 +284,7 @@ class PlayQueue(PlexObject):
after = self.getQueueItem(after) after = self.getQueueItem(after)
args["after"] = after.playQueueItemID args["after"] = after.playQueueItemID
path = "/playQueues/{playQueueID}/items/{playQueueItemID}/move{args}".format( path = f"/playQueues/{self.playQueueID}/items/{item.playQueueItemID}/move{utils.joinArgs(args)}"
playQueueID=self.playQueueID, playQueueItemID=item.playQueueItemID, args=utils.joinArgs(args)
)
data = self._server.query(path, method=self._server._session.put) data = self._server.query(path, method=self._server._session.put)
self._loadData(data) self._loadData(data)
return self return self
@ -304,23 +302,21 @@ class PlayQueue(PlexObject):
if item not in self: if item not in self:
item = self.getQueueItem(item) item = self.getQueueItem(item)
path = "/playQueues/{playQueueID}/items/{playQueueItemID}".format( path = f"/playQueues/{self.playQueueID}/items/{item.playQueueItemID}"
playQueueID=self.playQueueID, playQueueItemID=item.playQueueItemID
)
data = self._server.query(path, method=self._server._session.delete) data = self._server.query(path, method=self._server._session.delete)
self._loadData(data) self._loadData(data)
return self return self
def clear(self): def clear(self):
"""Remove all items from the PlayQueue.""" """Remove all items from the PlayQueue."""
path = "/playQueues/{playQueueID}/items".format(playQueueID=self.playQueueID) path = f"/playQueues/{self.playQueueID}/items"
data = self._server.query(path, method=self._server._session.delete) data = self._server.query(path, method=self._server._session.delete)
self._loadData(data) self._loadData(data)
return self return self
def refresh(self): def refresh(self):
"""Refresh the PlayQueue from the Plex server.""" """Refresh the PlayQueue from the Plex server."""
path = "/playQueues/{playQueueID}".format(playQueueID=self.playQueueID) path = f"/playQueues/{self.playQueueID}"
data = self._server.query(path, method=self._server._session.get) data = self._server.query(path, method=self._server._session.get)
self._loadData(data) self._loadData(data)
return self return self

View file

@ -171,7 +171,7 @@ class PlexServer(PlexObject):
return headers return headers
def _uriRoot(self): def _uriRoot(self):
return 'server://%s/com.plexapp.plugins.library' % self.machineIdentifier return f'server://{self.machineIdentifier}/com.plexapp.plugins.library'
@property @property
def library(self): def library(self):
@ -232,7 +232,7 @@ class PlexServer(PlexObject):
""" Returns a list of :class:`~plexapi.media.Agent` objects this server has available. """ """ Returns a list of :class:`~plexapi.media.Agent` objects this server has available. """
key = '/system/agents' key = '/system/agents'
if mediaType: if mediaType:
key += '?mediaType=%s' % utils.searchType(mediaType) key += f'?mediaType={utils.searchType(mediaType)}'
return self.fetchItems(key) return self.fetchItems(key)
def createToken(self, type='delegation', scope='all'): def createToken(self, type='delegation', scope='all'):
@ -240,7 +240,7 @@ class PlexServer(PlexObject):
if not self._token: if not self._token:
# Handle unclaimed servers # Handle unclaimed servers
return None return None
q = self.query('/security/token?type=%s&scope=%s' % (type, scope)) q = self.query(f'/security/token?type={type}&scope={scope}')
return q.attrib.get('token') return q.attrib.get('token')
def switchUser(self, username, session=None, timeout=None): def switchUser(self, username, session=None, timeout=None):
@ -291,7 +291,7 @@ class PlexServer(PlexObject):
try: try:
return next(account for account in self.systemAccounts() if account.id == accountID) return next(account for account in self.systemAccounts() if account.id == accountID)
except StopIteration: except StopIteration:
raise NotFound('Unknown account with accountID=%s' % accountID) from None raise NotFound(f'Unknown account with accountID={accountID}') from None
def systemDevices(self): def systemDevices(self):
""" Returns a list of :class:`~plexapi.server.SystemDevice` objects this server contains. """ """ Returns a list of :class:`~plexapi.server.SystemDevice` objects this server contains. """
@ -309,7 +309,7 @@ class PlexServer(PlexObject):
try: try:
return next(device for device in self.systemDevices() if device.id == deviceID) return next(device for device in self.systemDevices() if device.id == deviceID)
except StopIteration: except StopIteration:
raise NotFound('Unknown device with deviceID=%s' % deviceID) from None raise NotFound(f'Unknown device with deviceID={deviceID}') from None
def myPlexAccount(self): def myPlexAccount(self):
""" Returns a :class:`~plexapi.myplex.MyPlexAccount` object using the same """ Returns a :class:`~plexapi.myplex.MyPlexAccount` object using the same
@ -351,7 +351,7 @@ class PlexServer(PlexObject):
key = path.key key = path.key
elif path is not None: elif path is not None:
base64path = utils.base64str(path) base64path = utils.base64str(path)
key = '/services/browse/%s' % base64path key = f'/services/browse/{base64path}'
else: else:
key = '/services/browse' key = '/services/browse'
if includeFiles: if includeFiles:
@ -406,7 +406,7 @@ class PlexServer(PlexObject):
log.warning('%s did not advertise a port, checking plex.tv.', elem.attrib.get('name')) log.warning('%s did not advertise a port, checking plex.tv.', elem.attrib.get('name'))
ports = self._myPlexClientPorts() if ports is None else ports ports = self._myPlexClientPorts() if ports is None else ports
port = ports.get(elem.attrib.get('machineIdentifier')) port = ports.get(elem.attrib.get('machineIdentifier'))
baseurl = 'http://%s:%s' % (elem.attrib['host'], port) baseurl = f"http://{elem.attrib['host']}:{port}"
items.append(PlexClient(baseurl=baseurl, server=self, items.append(PlexClient(baseurl=baseurl, server=self,
token=self._token, data=elem, connect=False)) token=self._token, data=elem, connect=False))
@ -425,7 +425,7 @@ class PlexServer(PlexObject):
if client and client.title == name: if client and client.title == name:
return client return client
raise NotFound('Unknown client name: %s' % name) raise NotFound(f'Unknown client name: {name}')
def createCollection(self, title, section, items=None, smart=False, limit=None, def createCollection(self, title, section, items=None, smart=False, limit=None,
libtype=None, sort=None, filters=None, **kwargs): libtype=None, sort=None, filters=None, **kwargs):
@ -560,7 +560,7 @@ class PlexServer(PlexObject):
force (bool): Force server to check for new releases force (bool): Force server to check for new releases
download (bool): Download if a update is available. download (bool): Download if a update is available.
""" """
part = '/updater/check?download=%s' % (1 if download else 0) part = f'/updater/check?download={1 if download else 0}'
if force: if force:
self.query(part, method=self._session.put) self.query(part, method=self._session.put)
releases = self.fetchItems('/updater/status') releases = self.fetchItems('/updater/status')
@ -609,7 +609,7 @@ class PlexServer(PlexObject):
args['X-Plex-Container-Start'] = 0 args['X-Plex-Container-Start'] = 0
args['X-Plex-Container-Size'] = min(X_PLEX_CONTAINER_SIZE, maxresults) args['X-Plex-Container-Size'] = min(X_PLEX_CONTAINER_SIZE, maxresults)
while subresults and maxresults > len(results): while subresults and maxresults > len(results):
key = '/status/sessions/history/all%s' % utils.joinArgs(args) key = f'/status/sessions/history/all{utils.joinArgs(args)}'
subresults = self.fetchItems(key) subresults = self.fetchItems(key)
results += subresults[:maxresults - len(results)] results += subresults[:maxresults - len(results)]
args['X-Plex-Container-Start'] += args['X-Plex-Container-Size'] args['X-Plex-Container-Start'] += args['X-Plex-Container-Size']
@ -636,7 +636,7 @@ class PlexServer(PlexObject):
# TODO: Automatically retrieve and validate sort field similar to LibrarySection.search() # TODO: Automatically retrieve and validate sort field similar to LibrarySection.search()
args['sort'] = sort args['sort'] = sort
key = '/playlists%s' % utils.joinArgs(args) key = f'/playlists{utils.joinArgs(args)}'
return self.fetchItems(key, **kwargs) return self.fetchItems(key, **kwargs)
def playlist(self, title): def playlist(self, title):
@ -651,7 +651,7 @@ class PlexServer(PlexObject):
try: try:
return self.playlists(title=title, title__iexact=title)[0] return self.playlists(title=title, title__iexact=title)[0]
except IndexError: except IndexError:
raise NotFound('Unable to find playlist with title "%s".' % title) from None raise NotFound(f'Unable to find playlist with title "{title}".') from None
def optimizedItems(self, removeAll=None): def optimizedItems(self, removeAll=None):
""" Returns list of all :class:`~plexapi.media.Optimized` objects connected to server. """ """ Returns list of all :class:`~plexapi.media.Optimized` objects connected to server. """
@ -660,7 +660,7 @@ class PlexServer(PlexObject):
self.query(key, method=self._server._session.delete) self.query(key, method=self._server._session.delete)
else: else:
backgroundProcessing = self.fetchItem('/playlists?type=42') backgroundProcessing = self.fetchItem('/playlists?type=42')
return self.fetchItems('%s/items' % backgroundProcessing.key, cls=Optimized) return self.fetchItems(f'{backgroundProcessing.key}/items', cls=Optimized)
@deprecated('use "plexapi.media.Optimized.items()" instead') @deprecated('use "plexapi.media.Optimized.items()" instead')
def optimizedItem(self, optimizedID): def optimizedItem(self, optimizedID):
@ -669,7 +669,7 @@ class PlexServer(PlexObject):
""" """
backgroundProcessing = self.fetchItem('/playlists?type=42') backgroundProcessing = self.fetchItem('/playlists?type=42')
return self.fetchItem('%s/items/%s/items' % (backgroundProcessing.key, optimizedID)) return self.fetchItem(f'{backgroundProcessing.key}/items/{optimizedID}/items')
def conversions(self, pause=None): def conversions(self, pause=None):
""" Returns list of all :class:`~plexapi.media.Conversion` objects connected to server. """ """ Returns list of all :class:`~plexapi.media.Conversion` objects connected to server. """
@ -698,7 +698,7 @@ class PlexServer(PlexObject):
if response.status_code not in (200, 201, 204): if response.status_code not in (200, 201, 204):
codename = codes.get(response.status_code)[0] codename = codes.get(response.status_code)[0]
errtext = response.text.replace('\n', ' ') errtext = response.text.replace('\n', ' ')
message = '(%s) %s; %s %s' % (response.status_code, codename, response.url, errtext) message = f'({response.status_code}) {codename}; {response.url} {errtext}'
if response.status_code == 401: if response.status_code == 401:
raise Unauthorized(message) raise Unauthorized(message)
elif response.status_code == 404: elif response.status_code == 404:
@ -737,7 +737,7 @@ class PlexServer(PlexObject):
params['limit'] = limit params['limit'] = limit
if sectionId: if sectionId:
params['sectionId'] = sectionId params['sectionId'] = sectionId
key = '/hubs/search?%s' % urlencode(params) key = f'/hubs/search?{urlencode(params)}'
for hub in self.fetchItems(key, Hub): for hub in self.fetchItems(key, Hub):
if mediatype: if mediatype:
if hub.type == mediatype: if hub.type == mediatype:
@ -816,7 +816,7 @@ class PlexServer(PlexObject):
if imageFormat is not None: if imageFormat is not None:
params['format'] = imageFormat.lower() params['format'] = imageFormat.lower()
key = '/photo/:/transcode%s' % utils.joinArgs(params) key = f'/photo/:/transcode{utils.joinArgs(params)}'
return self.url(key, includeToken=True) return self.url(key, includeToken=True)
def url(self, key, includeToken=None): def url(self, key, includeToken=None):
@ -825,8 +825,8 @@ class PlexServer(PlexObject):
""" """
if self._token and (includeToken or self._showSecrets): if self._token and (includeToken or self._showSecrets):
delim = '&' if '?' in key else '?' delim = '&' if '?' in key else '?'
return '%s%s%sX-Plex-Token=%s' % (self._baseurl, key, delim, self._token) return f'{self._baseurl}{key}{delim}X-Plex-Token={self._token}'
return '%s%s' % (self._baseurl, key) return f'{self._baseurl}{key}'
def refreshSynclist(self): def refreshSynclist(self):
""" Force PMS to download new SyncList from Plex.tv. """ """ Force PMS to download new SyncList from Plex.tv. """
@ -860,7 +860,7 @@ class PlexServer(PlexObject):
log.debug('Plex is currently not allowed to delete media. Toggle set to not allow, exiting.') log.debug('Plex is currently not allowed to delete media. Toggle set to not allow, exiting.')
raise BadRequest('Plex is currently not allowed to delete media. Toggle set to not allow, exiting.') raise BadRequest('Plex is currently not allowed to delete media. Toggle set to not allow, exiting.')
value = 1 if toggle is True else 0 value = 1 if toggle is True else 0
return self.query('/:/prefs?allowMediaDeletion=%s' % value, self._session.put) return self.query(f'/:/prefs?allowMediaDeletion={value}', self._session.put)
def bandwidth(self, timespan=None, **kwargs): def bandwidth(self, timespan=None, **kwargs):
""" Returns a list of :class:`~plexapi.server.StatisticsBandwidth` objects """ Returns a list of :class:`~plexapi.server.StatisticsBandwidth` objects
@ -930,19 +930,19 @@ class PlexServer(PlexObject):
try: try:
params['timespan'] = timespans[timespan] params['timespan'] = timespans[timespan]
except KeyError: except KeyError:
raise BadRequest('Invalid timespan specified: %s. ' raise BadRequest(f"Invalid timespan specified: {timespan}. "
'Available timespans: %s' % (timespan, ', '.join(timespans.keys()))) f"Available timespans: {', '.join(timespans.keys())}")
filters = {'accountID', 'at', 'at<', 'at>', 'bytes', 'bytes<', 'bytes>', 'deviceID', 'lan'} filters = {'accountID', 'at', 'at<', 'at>', 'bytes', 'bytes<', 'bytes>', 'deviceID', 'lan'}
for key, value in kwargs.items(): for key, value in kwargs.items():
if key not in filters: if key not in filters:
raise BadRequest('Unknown filter: %s=%s' % (key, value)) raise BadRequest(f'Unknown filter: {key}={value}')
if key.startswith('at'): if key.startswith('at'):
try: try:
value = utils.cast(int, value.timestamp()) value = utils.cast(int, value.timestamp())
except AttributeError: except AttributeError:
raise BadRequest('Time frame filter must be a datetime object: %s=%s' % (key, value)) raise BadRequest(f'Time frame filter must be a datetime object: {key}={value}')
elif key.startswith('bytes') or key == 'lan': elif key.startswith('bytes') or key == 'lan':
value = utils.cast(int, value) value = utils.cast(int, value)
elif key == 'accountID': elif key == 'accountID':
@ -950,7 +950,7 @@ class PlexServer(PlexObject):
value = 1 # The admin account is accountID=1 value = 1 # The admin account is accountID=1
params[key] = value params[key] = value
key = '/statistics/bandwidth?%s' % urlencode(params) key = f'/statistics/bandwidth?{urlencode(params)}'
return self.fetchItems(key, StatisticsBandwidth) return self.fetchItems(key, StatisticsBandwidth)
def resources(self): def resources(self):
@ -973,13 +973,9 @@ class PlexServer(PlexObject):
base = 'https://app.plex.tv/desktop/' base = 'https://app.plex.tv/desktop/'
if endpoint: if endpoint:
return '%s#!/server/%s/%s%s' % ( return f'{base}#!/server/{self.machineIdentifier}/{endpoint}{utils.joinArgs(kwargs)}'
base, self.machineIdentifier, endpoint, utils.joinArgs(kwargs)
)
else: else:
return '%s#!/media/%s/com.plexapp.plugins.library%s' % ( return f'{base}#!/media/{self.machineIdentifier}/com.plexapp.plugins.library{utils.joinArgs(kwargs)}'
base, self.machineIdentifier, utils.joinArgs(kwargs)
)
def getWebURL(self, base=None, playlistTab=None): def getWebURL(self, base=None, playlistTab=None):
""" Returns the Plex Web URL for the server. """ Returns the Plex Web URL for the server.
@ -990,7 +986,7 @@ class PlexServer(PlexObject):
playlistTab (str): The playlist tab (audio, video, photo). Only used for the playlist URL. playlistTab (str): The playlist tab (audio, video, photo). Only used for the playlist URL.
""" """
if playlistTab is not None: if playlistTab is not None:
params = {'source': 'playlists', 'pivot': 'playlists.%s' % playlistTab} params = {'source': 'playlists', 'pivot': f'playlists.{playlistTab}'}
else: else:
params = {'key': '/hubs', 'pageType': 'hub'} params = {'key': '/hubs', 'pageType': 'hub'}
return self._buildWebURL(base=base, **params) return self._buildWebURL(base=base, **params)
@ -1122,7 +1118,7 @@ class SystemDevice(PlexObject):
self.clientIdentifier = data.attrib.get('clientIdentifier') self.clientIdentifier = data.attrib.get('clientIdentifier')
self.createdAt = utils.toDatetime(data.attrib.get('createdAt')) self.createdAt = utils.toDatetime(data.attrib.get('createdAt'))
self.id = utils.cast(int, data.attrib.get('id')) self.id = utils.cast(int, data.attrib.get('id'))
self.key = '/devices/%s' % self.id self.key = f'/devices/{self.id}'
self.name = data.attrib.get('name') self.name = data.attrib.get('name')
self.platform = data.attrib.get('platform') self.platform = data.attrib.get('platform')
@ -1193,10 +1189,7 @@ class StatisticsResources(PlexObject):
self.timespan = utils.cast(int, data.attrib.get('timespan')) self.timespan = utils.cast(int, data.attrib.get('timespan'))
def __repr__(self): def __repr__(self):
return '<%s>' % ':'.join([p for p in [ return f"<{':'.join([p for p in [self.__class__.__name__, self._clean(int(self.at.timestamp()))] if p])}>"
self.__class__.__name__,
self._clean(int(self.at.timestamp()))
] if p])
@utils.registerPlexObject @utils.registerPlexObject

View file

@ -51,7 +51,7 @@ class Settings(PlexObject):
id = utils.lowerFirst(id) id = utils.lowerFirst(id)
if id in self._settings: if id in self._settings:
return self._settings[id] return self._settings[id]
raise NotFound('Invalid setting id: %s' % id) raise NotFound(f'Invalid setting id: {id}')
def groups(self): def groups(self):
""" Returns a dict of lists for all :class:`~plexapi.settings.Setting` """ Returns a dict of lists for all :class:`~plexapi.settings.Setting`
@ -77,12 +77,12 @@ class Settings(PlexObject):
params = {} params = {}
for setting in self.all(): for setting in self.all():
if setting._setValue: if setting._setValue:
log.info('Saving PlexServer setting %s = %s' % (setting.id, setting._setValue)) log.info(f'Saving PlexServer setting {setting.id} = {setting._setValue}')
params[setting.id] = quote(setting._setValue) params[setting.id] = quote(setting._setValue)
if not params: if not params:
raise BadRequest('No setting have been modified.') raise BadRequest('No setting have been modified.')
querystr = '&'.join(['%s=%s' % (k, v) for k, v in params.items()]) querystr = '&'.join([f'{k}={v}' for k, v in params.items()])
url = '%s?%s' % (self.key, querystr) url = f'{self.key}?{querystr}'
self._server.query(url, self._server._session.put) self._server.query(url, self._server._session.put)
self.reload() self.reload()
@ -149,16 +149,16 @@ class Setting(PlexObject):
# check a few things up front # check a few things up front
if not isinstance(value, self.TYPES[self.type]['type']): if not isinstance(value, self.TYPES[self.type]['type']):
badtype = type(value).__name__ badtype = type(value).__name__
raise BadRequest('Invalid value for %s: a %s is required, not %s' % (self.id, self.type, badtype)) raise BadRequest(f'Invalid value for {self.id}: a {self.type} is required, not {badtype}')
if self.enumValues and value not in self.enumValues: if self.enumValues and value not in self.enumValues:
raise BadRequest('Invalid value for %s: %s not in %s' % (self.id, value, list(self.enumValues))) raise BadRequest(f'Invalid value for {self.id}: {value} not in {list(self.enumValues)}')
# store value off to the side until we call settings.save() # store value off to the side until we call settings.save()
tostr = self.TYPES[self.type]['tostr'] tostr = self.TYPES[self.type]['tostr']
self._setValue = tostr(value) self._setValue = tostr(value)
def toUrl(self): def toUrl(self):
"""Helper for urls""" """Helper for urls"""
return '%s=%s' % (self.id, self._value or self.value) return f'{self.id}={self._value or self.value}'
@utils.registerPlexObject @utils.registerPlexObject
@ -174,6 +174,6 @@ class Preferences(Setting):
def _default(self): def _default(self):
""" Set the default value for this setting.""" """ Set the default value for this setting."""
key = '%s/prefs?' % self._initpath key = f'{self._initpath}/prefs?'
url = key + '%s=%s' % (self.id, self.default) url = key + f'{self.id}={self.default}'
self._server.query(url, method=self._server._session.put) self._server.query(url, method=self._server._session.put)

View file

@ -96,9 +96,7 @@ class PlexSonosClient(PlexClient):
{ {
"type": "music", "type": "music",
"providerIdentifier": "com.plexapp.plugins.library", "providerIdentifier": "com.plexapp.plugins.library",
"containerKey": "/playQueues/{}?own=1".format( "containerKey": f"/playQueues/{playqueue.playQueueID}?own=1",
playqueue.playQueueID
),
"key": media.key, "key": media.key,
"offset": offset, "offset": offset,
"machineIdentifier": media._server.machineIdentifier, "machineIdentifier": media._server.machineIdentifier,

View file

@ -81,13 +81,13 @@ class SyncItem(PlexObject):
""" Returns :class:`~plexapi.myplex.MyPlexResource` with server of current item. """ """ Returns :class:`~plexapi.myplex.MyPlexResource` with server of current item. """
server = [s for s in self._server.resources() if s.clientIdentifier == self.machineIdentifier] server = [s for s in self._server.resources() if s.clientIdentifier == self.machineIdentifier]
if len(server) == 0: if len(server) == 0:
raise NotFound('Unable to find server with uuid %s' % self.machineIdentifier) raise NotFound(f'Unable to find server with uuid {self.machineIdentifier}')
return server[0] return server[0]
def getMedia(self): def getMedia(self):
""" Returns list of :class:`~plexapi.base.Playable` which belong to this sync item. """ """ Returns list of :class:`~plexapi.base.Playable` which belong to this sync item. """
server = self.server().connect() server = self.server().connect()
key = '/sync/items/%s' % self.id key = f'/sync/items/{self.id}'
return server.fetchItems(key) return server.fetchItems(key)
def markDownloaded(self, media): def markDownloaded(self, media):
@ -97,7 +97,7 @@ class SyncItem(PlexObject):
Parameters: Parameters:
media (base.Playable): the media to be marked as downloaded. media (base.Playable): the media to be marked as downloaded.
""" """
url = '/sync/%s/item/%s/downloaded' % (self.clientIdentifier, media.ratingKey) url = f'/sync/{self.clientIdentifier}/item/{media.ratingKey}/downloaded'
media._server.query(url, method=requests.put) media._server.query(url, method=requests.put)
def delete(self): def delete(self):
@ -159,13 +159,14 @@ class Status:
self.itemsCount = plexapi.utils.cast(int, itemsCount) self.itemsCount = plexapi.utils.cast(int, itemsCount)
def __repr__(self): def __repr__(self):
return '<%s>:%s' % (self.__class__.__name__, dict( d = dict(
itemsCount=self.itemsCount, itemsCount=self.itemsCount,
itemsCompleteCount=self.itemsCompleteCount, itemsCompleteCount=self.itemsCompleteCount,
itemsDownloadedCount=self.itemsDownloadedCount, itemsDownloadedCount=self.itemsDownloadedCount,
itemsReadyCount=self.itemsReadyCount, itemsReadyCount=self.itemsReadyCount,
itemsSuccessfulCount=self.itemsSuccessfulCount itemsSuccessfulCount=self.itemsSuccessfulCount
)) )
return f'<{self.__class__.__name__}>:{d}'
class MediaSettings: class MediaSettings:

View file

@ -117,12 +117,12 @@ def registerPlexObject(cls):
buildItem() below for an example. buildItem() below for an example.
""" """
etype = getattr(cls, 'STREAMTYPE', getattr(cls, 'TAGTYPE', cls.TYPE)) etype = getattr(cls, 'STREAMTYPE', getattr(cls, 'TAGTYPE', cls.TYPE))
ehash = '%s.%s' % (cls.TAG, etype) if etype else cls.TAG ehash = f'{cls.TAG}.{etype}' if etype else cls.TAG
if getattr(cls, '_SESSIONTYPE', None): if getattr(cls, '_SESSIONTYPE', None):
ehash = '%s.%s' % (ehash, 'session') ehash = f"{ehash}.{'session'}"
if ehash in PLEXOBJECTS: if ehash in PLEXOBJECTS:
raise Exception('Ambiguous PlexObject definition %s(tag=%s, type=%s) with %s' % raise Exception(f'Ambiguous PlexObject definition {cls.__name__}(tag={cls.TAG}, type={etype}) '
(cls.__name__, cls.TAG, etype, PLEXOBJECTS[ehash].__name__)) f'with {PLEXOBJECTS[ehash].__name__}')
PLEXOBJECTS[ehash] = cls PLEXOBJECTS[ehash] = cls
return cls return cls
@ -165,8 +165,8 @@ def joinArgs(args):
arglist = [] arglist = []
for key in sorted(args, key=lambda x: x.lower()): for key in sorted(args, key=lambda x: x.lower()):
value = str(args[key]) value = str(args[key])
arglist.append('%s=%s' % (key, quote(value, safe=''))) arglist.append(f"{key}={quote(value, safe='')}")
return '?%s' % '&'.join(arglist) return f"?{'&'.join(arglist)}"
def lowerFirst(s): def lowerFirst(s):
@ -218,7 +218,7 @@ def searchType(libtype):
return libtype return libtype
if SEARCHTYPES.get(libtype) is not None: if SEARCHTYPES.get(libtype) is not None:
return SEARCHTYPES[libtype] return SEARCHTYPES[libtype]
raise NotFound('Unknown libtype: %s' % libtype) raise NotFound(f'Unknown libtype: {libtype}')
def reverseSearchType(libtype): def reverseSearchType(libtype):
@ -236,7 +236,7 @@ def reverseSearchType(libtype):
for k, v in SEARCHTYPES.items(): for k, v in SEARCHTYPES.items():
if libtype == v: if libtype == v:
return k return k
raise NotFound('Unknown libtype: %s' % libtype) raise NotFound(f'Unknown libtype: {libtype}')
def tagType(tag): def tagType(tag):
@ -253,7 +253,7 @@ def tagType(tag):
return tag return tag
if TAGTYPES.get(tag) is not None: if TAGTYPES.get(tag) is not None:
return TAGTYPES[tag] return TAGTYPES[tag]
raise NotFound('Unknown tag: %s' % tag) raise NotFound(f'Unknown tag: {tag}')
def reverseTagType(tag): def reverseTagType(tag):
@ -271,7 +271,7 @@ def reverseTagType(tag):
for k, v in TAGTYPES.items(): for k, v in TAGTYPES.items():
if tag == v: if tag == v:
return k return k
raise NotFound('Unknown tag: %s' % tag) raise NotFound(f'Unknown tag: {tag}')
def threaded(callback, listargs): def threaded(callback, listargs):
@ -348,7 +348,7 @@ def toList(value, itemcast=None, delim=','):
def cleanFilename(filename, replace='_'): def cleanFilename(filename, replace='_'):
whitelist = "-_.()[] {}{}".format(string.ascii_letters, string.digits) whitelist = f"-_.()[] {string.ascii_letters}{string.digits}"
cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode() cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode()
cleaned_filename = ''.join(c if c in whitelist else replace for c in cleaned_filename) cleaned_filename = ''.join(c if c in whitelist else replace for c in cleaned_filename)
return cleaned_filename return cleaned_filename
@ -376,11 +376,11 @@ def downloadSessionImages(server, filename=None, height=150, width=150,
if media.thumb: if media.thumb:
url = media.thumb url = media.thumb
if part.indexes: # always use bif images if available. if part.indexes: # always use bif images if available.
url = '/library/parts/%s/indexes/%s/%s' % (part.id, part.indexes.lower(), media.viewOffset) url = f'/library/parts/{part.id}/indexes/{part.indexes.lower()}/{media.viewOffset}'
if url: if url:
if filename is None: if filename is None:
prettyname = media._prettyfilename() prettyname = media._prettyfilename()
filename = 'session_transcode_%s_%s_%s' % (media.usernames[0], prettyname, int(time.time())) filename = f'session_transcode_{media.usernames[0]}_{prettyname}_{int(time.time())}'
url = server.transcodeImage(url, height, width, opacity, saturation) url = server.transcodeImage(url, height, width, opacity, saturation)
filepath = download(url, filename=filename) filepath = download(url, filename=filename)
info['username'] = {'filepath': filepath, 'url': url} info['username'] = {'filepath': filepath, 'url': url}
@ -467,13 +467,13 @@ def getMyPlexAccount(opts=None): # pragma: no cover
from plexapi.myplex import MyPlexAccount from plexapi.myplex import MyPlexAccount
# 1. Check command-line options # 1. Check command-line options
if opts and opts.username and opts.password: if opts and opts.username and opts.password:
print('Authenticating with Plex.tv as %s..' % opts.username) print(f'Authenticating with Plex.tv as {opts.username}..')
return MyPlexAccount(opts.username, opts.password) return MyPlexAccount(opts.username, opts.password)
# 2. Check Plexconfig (environment variables and config.ini) # 2. Check Plexconfig (environment variables and config.ini)
config_username = CONFIG.get('auth.myplex_username') config_username = CONFIG.get('auth.myplex_username')
config_password = CONFIG.get('auth.myplex_password') config_password = CONFIG.get('auth.myplex_password')
if config_username and config_password: if config_username and config_password:
print('Authenticating with Plex.tv as %s..' % config_username) print(f'Authenticating with Plex.tv as {config_username}..')
return MyPlexAccount(config_username, config_password) return MyPlexAccount(config_username, config_password)
config_token = CONFIG.get('auth.server_token') config_token = CONFIG.get('auth.server_token')
if config_token: if config_token:
@ -482,7 +482,7 @@ def getMyPlexAccount(opts=None): # pragma: no cover
# 3. Prompt for username and password on the command line # 3. Prompt for username and password on the command line
username = input('What is your plex.tv username: ') username = input('What is your plex.tv username: ')
password = getpass('What is your plex.tv password: ') password = getpass('What is your plex.tv password: ')
print('Authenticating with Plex.tv as %s..' % username) print(f'Authenticating with Plex.tv as {username}..')
return MyPlexAccount(username, password) return MyPlexAccount(username, password)
@ -548,12 +548,12 @@ def choose(msg, items, attr): # pragma: no cover
print() print()
for index, i in enumerate(items): for index, i in enumerate(items):
name = attr(i) if callable(attr) else getattr(i, attr) name = attr(i) if callable(attr) else getattr(i, attr)
print(' %s: %s' % (index, name)) print(f' {index}: {name}')
print() print()
# Request choice from the user # Request choice from the user
while True: while True:
try: try:
inp = input('%s: ' % msg) inp = input(f'{msg}: ')
if any(s in inp for s in (':', '::', '-')): if any(s in inp for s in (':', '::', '-')):
idx = slice(*map(lambda x: int(x.strip()) if x.strip() else None, inp.split(':'))) idx = slice(*map(lambda x: int(x.strip()) if x.strip() else None, inp.split(':')))
return items[idx] return items[idx]
@ -572,8 +572,7 @@ def getAgentIdentifier(section, agent):
if agent in identifiers: if agent in identifiers:
return ag.identifier return ag.identifier
agents += identifiers agents += identifiers
raise NotFound('Could not find "%s" in agents list (%s)' % raise NotFound(f"Could not find \"{agent}\" in agents list ({', '.join(agents)})")
(agent, ', '.join(agents)))
def base64str(text): def base64str(text):
@ -587,7 +586,7 @@ def deprecated(message, stacklevel=2):
when the function is used.""" when the function is used."""
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
msg = 'Call to deprecated function or method "%s", %s.' % (func.__name__, message) msg = f'Call to deprecated function or method "{func.__name__}", {message}.'
warnings.warn(msg, category=DeprecationWarning, stacklevel=stacklevel) warnings.warn(msg, category=DeprecationWarning, stacklevel=stacklevel)
log.warning(msg) log.warning(msg)
return func(*args, **kwargs) return func(*args, **kwargs)

View file

@ -117,7 +117,7 @@ class Video(PlexPartialObject, PlayedUnplayedMixin):
def uploadSubtitles(self, filepath): def uploadSubtitles(self, filepath):
""" Upload Subtitle file for video. """ """ Upload Subtitle file for video. """
url = '%s/subtitles' % self.key url = f'{self.key}/subtitles'
filename = os.path.basename(filepath) filename = os.path.basename(filepath)
subFormat = os.path.splitext(filepath)[1][1:] subFormat = os.path.splitext(filepath)[1][1:]
with open(filepath, 'rb') as subfile: with open(filepath, 'rb') as subfile:
@ -188,7 +188,7 @@ class Video(PlexPartialObject, PlayedUnplayedMixin):
from plexapi.sync import Policy, MediaSettings from plexapi.sync import Policy, MediaSettings
backgroundProcessing = self.fetchItem('/playlists?type=42') backgroundProcessing = self.fetchItem('/playlists?type=42')
key = '%s/items' % backgroundProcessing.key key = f'{backgroundProcessing.key}/items'
tags = {t.tag.lower(): t.id for t in self._server.library.tags('mediaProcessingTarget')} tags = {t.tag.lower(): t.id for t in self._server.library.tags('mediaProcessingTarget')}
# Additional keys for shorthand values # Additional keys for shorthand values
@ -279,7 +279,7 @@ class Video(PlexPartialObject, PlayedUnplayedMixin):
section = self._server.library.sectionByID(self.librarySectionID) section = self._server.library.sectionByID(self.librarySectionID)
sync_item.location = 'library://%s/item/%s' % (section.uuid, quote_plus(self.key)) sync_item.location = f'library://{section.uuid}/item/{quote_plus(self.key)}'
sync_item.policy = Policy.create(limit, unwatched) sync_item.policy = Policy.create(limit, unwatched)
sync_item.mediaSettings = MediaSettings.createVideo(videoQuality) sync_item.mediaSettings = MediaSettings.createVideo(videoQuality)
@ -395,7 +395,7 @@ class Movie(
def _prettyfilename(self): def _prettyfilename(self):
""" Returns a filename for use in download. """ """ Returns a filename for use in download. """
return '%s (%s)' % (self.title, self.year) return f'{self.title} ({self.year})'
def reviews(self): def reviews(self):
""" Returns a list of :class:`~plexapi.media.Review` objects. """ """ Returns a list of :class:`~plexapi.media.Review` objects. """
@ -607,7 +607,7 @@ class Show(
""" """
filepaths = [] filepaths = []
for episode in self.episodes(): for episode in self.episodes():
_savepath = os.path.join(savepath, 'Season %s' % str(episode.seasonNumber).zfill(2)) if subfolders else savepath _savepath = os.path.join(savepath, f'Season {str(episode.seasonNumber).zfill(2)}') if subfolders else savepath
filepaths += episode.download(_savepath, keep_original_name, **kwargs) filepaths += episode.download(_savepath, keep_original_name, **kwargs)
return filepaths return filepaths
@ -752,7 +752,7 @@ class Season(
def _defaultSyncTitle(self): def _defaultSyncTitle(self):
""" Returns str, default title for a new syncItem. """ """ Returns str, default title for a new syncItem. """
return '%s - %s' % (self.parentTitle, self.title) return f'{self.parentTitle} - {self.title}'
@utils.registerPlexObject @utils.registerPlexObject
@ -859,7 +859,7 @@ class Episode(
if not self.parentRatingKey and self.grandparentRatingKey: if not self.parentRatingKey and self.grandparentRatingKey:
self.parentRatingKey = self.show().season(season=self.parentIndex).ratingKey self.parentRatingKey = self.show().season(season=self.parentIndex).ratingKey
if self.parentRatingKey: if self.parentRatingKey:
self.parentKey = '/library/metadata/%s' % self.parentRatingKey self.parentKey = f'/library/metadata/{self.parentRatingKey}'
def __repr__(self): def __repr__(self):
return '<%s>' % ':'.join([p for p in [ return '<%s>' % ':'.join([p for p in [
@ -870,7 +870,7 @@ class Episode(
def _prettyfilename(self): def _prettyfilename(self):
""" Returns a filename for use in download. """ """ Returns a filename for use in download. """
return '%s - %s - %s' % (self.grandparentTitle, self.seasonEpisode, self.title) return f'{self.grandparentTitle} - {self.seasonEpisode} - {self.title}'
@property @property
def actors(self): def actors(self):
@ -902,7 +902,7 @@ class Episode(
@property @property
def seasonEpisode(self): def seasonEpisode(self):
""" Returns the s00e00 string containing the season and episode numbers. """ """ Returns the s00e00 string containing the season and episode numbers. """
return 's%se%s' % (str(self.seasonNumber).zfill(2), str(self.episodeNumber).zfill(2)) return f's{str(self.seasonNumber).zfill(2)}e{str(self.episodeNumber).zfill(2)}'
@property @property
def hasCommercialMarker(self): def hasCommercialMarker(self):
@ -929,7 +929,7 @@ class Episode(
def _defaultSyncTitle(self): def _defaultSyncTitle(self):
""" Returns str, default title for a new syncItem. """ """ Returns str, default title for a new syncItem. """
return '%s - %s - (%s) %s' % (self.grandparentTitle, self.parentTitle, self.seasonEpisode, self.title) return f'{self.grandparentTitle} - {self.parentTitle} - ({self.seasonEpisode}) {self.title}'
@utils.registerPlexObject @utils.registerPlexObject
@ -1003,7 +1003,7 @@ class Extra(Clip):
def _prettyfilename(self): def _prettyfilename(self):
""" Returns a filename for use in download. """ """ Returns a filename for use in download. """
return '%s (%s)' % (self.title, self.subtype) return f'{self.title} ({self.subtype})'
@utils.registerPlexObject @utils.registerPlexObject

View file

@ -51,7 +51,7 @@ ENTITLEMENTS = {
"windows", "windows",
"windows_phone", "windows_phone",
} }
SYNC_DEVICE_IDENTIFIER = "test-sync-client-%s" % plexapi.X_PLEX_IDENTIFIER SYNC_DEVICE_IDENTIFIER = f"test-sync-client-{plexapi.X_PLEX_IDENTIFIER}"
SYNC_DEVICE_HEADERS = { SYNC_DEVICE_HEADERS = {
"X-Plex-Provides": "sync-target", "X-Plex-Provides": "sync-target",
"X-Plex-Platform": "iOS", "X-Plex-Platform": "iOS",
@ -320,7 +320,7 @@ def shared_username(account):
in (user.username.lower(), user.email.lower(), str(user.id)) in (user.username.lower(), user.email.lower(), str(user.id))
): ):
return username return username
pytest.skip("Shared user %s wasn't found in your MyPlex account" % username) pytest.skip(f"Shared user {username} wasn't found in your MyPlex account")
@pytest.fixture() @pytest.fixture()
@ -423,9 +423,6 @@ def wait_until(condition_function, delay=0.25, timeout=1, *args, **kwargs):
time.sleep(delay) time.sleep(delay)
ready = condition_function(*args, **kwargs) ready = condition_function(*args, **kwargs)
assert ready, "Wait timeout after %d retries, %.2f seconds" % ( assert ready, f"Wait timeout after {int(retries)} retries, {time.time() - start:.2f} seconds"
retries,
time.time() - start,
)
return ready return ready

View file

@ -10,10 +10,10 @@ def wait_for_idle_server(server):
"""Wait for PMS activities to complete with a timeout.""" """Wait for PMS activities to complete with a timeout."""
attempts = 0 attempts = 0
while server.activities and attempts < MAX_ATTEMPTS: while server.activities and attempts < MAX_ATTEMPTS:
print("Waiting for activities to finish: {activities}".format(activities=server.activities)) print(f"Waiting for activities to finish: {server.activities}")
time.sleep(1) time.sleep(1)
attempts += 1 attempts += 1
assert attempts < MAX_ATTEMPTS, "Server still busy after {MAX_ATTEMPTS}s".format(MAX_ATTEMPTS=MAX_ATTEMPTS) assert attempts < MAX_ATTEMPTS, f"Server still busy after {MAX_ATTEMPTS}s"
def wait_for_metadata_processing(server): def wait_for_metadata_processing(server):
@ -26,12 +26,12 @@ def wait_for_metadata_processing(server):
tl = section.timeline() tl = section.timeline()
if tl.updateQueueSize > 0: if tl.updateQueueSize > 0:
busy = True busy = True
print("{title}: {updateQueueSize} items left".format(title=section.title, updateQueueSize=tl.updateQueueSize)) print(f"{section.title}: {tl.updateQueueSize} items left")
if not busy or attempts > MAX_ATTEMPTS: if not busy or attempts > MAX_ATTEMPTS:
break break
time.sleep(1) time.sleep(1)
attempts += 1 attempts += 1
assert attempts < MAX_ATTEMPTS, "Metadata still processing after {MAX_ATTEMPTS}s".format(MAX_ATTEMPTS=MAX_ATTEMPTS) assert attempts < MAX_ATTEMPTS, f"Metadata still processing after {MAX_ATTEMPTS}s"
def test_ensure_activities_completed(plex): def test_ensure_activities_completed(plex):

View file

@ -9,8 +9,7 @@ def _check_capabilities(client, capabilities):
for capability in capabilities: for capability in capabilities:
if capability not in supported: if capability not in supported:
pytest.skip( pytest.skip(
"Client %s doesn't support %s capability support %s" f"Client {client.title} doesn't support {capability} capability support {supported}"
% (client.title, capability, supported)
) )
@ -79,22 +78,22 @@ def test_client_playback(plex, client, movies, proxy):
subs = [ subs = [
stream for stream in movie.subtitleStreams() if stream.language == "English" stream for stream in movie.subtitleStreams() if stream.language == "English"
] ]
print("client.playMedia(%s)" % movie.title) print(f"client.playMedia({movie.title})")
client.playMedia(movie) client.playMedia(movie)
time.sleep(5) time.sleep(5)
print("client.pause(%s)" % mtype) print(f"client.pause({mtype})")
client.pause(mtype) client.pause(mtype)
time.sleep(2) time.sleep(2)
print("client.stepForward(%s)" % mtype) print(f"client.stepForward({mtype})")
client.stepForward(mtype) client.stepForward(mtype)
time.sleep(5) time.sleep(5)
print("client.play(%s)" % mtype) print(f"client.play({mtype})")
client.play(mtype) client.play(mtype)
time.sleep(3) time.sleep(3)
print("client.stepBack(%s)" % mtype) print(f"client.stepBack({mtype})")
client.stepBack(mtype) client.stepBack(mtype)
time.sleep(5) time.sleep(5)
print("client.play(%s)" % mtype) print(f"client.play({mtype})")
client.play(mtype) client.play(mtype)
time.sleep(3) time.sleep(3)
print("client.seekTo(1*60*1000)") print("client.seekTo(1*60*1000)")
@ -107,7 +106,7 @@ def test_client_playback(plex, client, movies, proxy):
print("client.setSubtitleStream(subs[0])") print("client.setSubtitleStream(subs[0])")
client.setSubtitleStream(subs[0].id, mtype) client.setSubtitleStream(subs[0].id, mtype)
time.sleep(10) time.sleep(10)
print("client.stop(%s)" % mtype) print(f"client.stop({mtype})")
client.stop(mtype) client.stop(mtype)
time.sleep(1) time.sleep(1)
finally: finally:
@ -133,7 +132,7 @@ def test_client_timeline(plex, client, movies, proxy):
client.playMedia(movie) client.playMedia(movie)
time.sleep(10) time.sleep(10)
assert client.isPlayingMedia() is True assert client.isPlayingMedia() is True
print("client.stop(%s)" % mtype) print(f"client.stop({mtype})")
client.stop(mtype) client.stop(mtype)
time.sleep(10) time.sleep(10)
assert client.isPlayingMedia() is False assert client.isPlayingMedia() is False

View file

@ -25,7 +25,7 @@ def test_Collection_attrs(collection):
assert collection.key.startswith("/library/collections/") assert collection.key.startswith("/library/collections/")
assert not collection.labels assert not collection.labels
assert utils.is_int(collection.librarySectionID) assert utils.is_int(collection.librarySectionID)
assert collection.librarySectionKey == "/library/sections/%s" % collection.librarySectionID assert collection.librarySectionKey == f"/library/sections/{collection.librarySectionID}"
assert collection.librarySectionTitle == "Movies" assert collection.librarySectionTitle == "Movies"
assert utils.is_int(collection.maxYear) assert utils.is_int(collection.maxYear)
assert utils.is_int(collection.minYear) assert utils.is_int(collection.minYear)
@ -35,7 +35,7 @@ def test_Collection_attrs(collection):
assert collection.subtype == "movie" assert collection.subtype == "movie"
assert collection.summary == "" assert collection.summary == ""
assert collection.theme is None assert collection.theme is None
assert collection.thumb.startswith("/library/collections/%s/composite" % collection.ratingKey) assert collection.thumb.startswith(f"/library/collections/{collection.ratingKey}/composite")
assert collection.thumbBlurHash is None assert collection.thumbBlurHash is None
assert collection.title == "Test Collection" assert collection.title == "Test Collection"
assert collection.titleSort == collection.title assert collection.titleSort == collection.title

View file

@ -111,7 +111,7 @@ def test_library_section_delete(movies, patched_http_call):
def test_library_fetchItem(plex, movie): def test_library_fetchItem(plex, movie):
item1 = plex.library.fetchItem("/library/metadata/%s" % movie.ratingKey) item1 = plex.library.fetchItem(f"/library/metadata/{movie.ratingKey}")
item2 = plex.library.fetchItem(movie.ratingKey) item2 = plex.library.fetchItem(movie.ratingKey)
assert item1.title == "Elephants Dream" assert item1.title == "Elephants Dream"
assert item1 == item2 == movie assert item1 == item2 == movie
@ -334,8 +334,8 @@ def test_library_MovieSection_PlexWebURL(plex, movies):
url = movies.getWebURL(tab=tab) url = movies.getWebURL(tab=tab)
assert url.startswith('https://app.plex.tv/desktop') assert url.startswith('https://app.plex.tv/desktop')
assert plex.machineIdentifier in url assert plex.machineIdentifier in url
assert 'source=%s' % movies.key in url assert f'source={movies.key}' in url
assert 'pivot=%s' % tab in url assert f'pivot={tab}' in url
# Test a different base # Test a different base
base = 'https://doesnotexist.com/plex' base = 'https://doesnotexist.com/plex'
url = movies.getWebURL(base=base) url = movies.getWebURL(base=base)
@ -349,7 +349,7 @@ def test_library_MovieSection_PlexWebURL_hub(plex, movies):
url = hub.section().getWebURL(key=hub.key) url = hub.section().getWebURL(key=hub.key)
assert url.startswith('https://app.plex.tv/desktop') assert url.startswith('https://app.plex.tv/desktop')
assert plex.machineIdentifier in url assert plex.machineIdentifier in url
assert 'source=%s' % movies.key in url assert f'source={movies.key}' in url
assert quote_plus(hub.key) in url assert quote_plus(hub.key) in url

View file

@ -36,12 +36,12 @@ def test_readme_examples(plex):
for title, example in examples: for title, example in examples:
if _check_run_example(title): if _check_run_example(title):
try: try:
print("\n%s\n%s" % (title, "-" * len(title))) print(f"\n{title}\n{'-' * len(title)}")
exec("\n".join(example)) exec("\n".join(example))
except Exception as err: except Exception as err:
failed += 1 failed += 1
print("Error running test: %s\nError: %s" % (title, err)) print(f"Error running test: {title}\nError: {err}")
assert not failed, "%s examples raised an exception." % failed assert not failed, f"{failed} examples raised an exception."
def _fetch_examples(): def _fetch_examples():

View file

@ -10,10 +10,10 @@ from .payloads import MYPLEX_INVITE
def test_myplex_accounts(account, plex): def test_myplex_accounts(account, plex):
assert account, "Must specify username, password & resource to run this test." assert account, "Must specify username, password & resource to run this test."
print("MyPlexAccount:") print("MyPlexAccount:")
print("username: %s" % account.username) print(f"username: {account.username}")
print("email: %s" % account.email) print(f"email: {account.email}")
print("home: %s" % account.home) print(f"home: {account.home}")
print("queueEmail: %s" % account.queueEmail) print(f"queueEmail: {account.queueEmail}")
assert account.username, "Account has no username" assert account.username, "Account has no username"
assert account.authenticationToken, "Account has no authenticationToken" assert account.authenticationToken, "Account has no authenticationToken"
assert account.email, "Account has no email" assert account.email, "Account has no email"
@ -21,9 +21,9 @@ def test_myplex_accounts(account, plex):
assert account.queueEmail, "Account has no queueEmail" assert account.queueEmail, "Account has no queueEmail"
account = plex.account() account = plex.account()
print("Local PlexServer.account():") print("Local PlexServer.account():")
print("username: %s" % account.username) print(f"username: {account.username}")
# print('authToken: %s' % account.authToken) # print('authToken: %s' % account.authToken)
print("signInState: %s" % account.signInState) print(f"signInState: {account.signInState}")
assert account.username, "Account has no username" assert account.username, "Account has no username"
assert account.authToken, "Account has no authToken" assert account.authToken, "Account has no authToken"
assert account.signInState, "Account has no signInState" assert account.signInState, "Account has no signInState"
@ -36,8 +36,8 @@ def test_myplex_resources(account):
name = resource.name or "Unknown" name = resource.name or "Unknown"
connections = [c.uri for c in resource.connections] connections = [c.uri for c in resource.connections]
connections = ", ".join(connections) if connections else "None" connections = ", ".join(connections) if connections else "None"
print("%s (%s): %s" % (name, resource.product, connections)) print(f"{name} ({resource.product}): {connections}")
assert resources, "No resources found for account: %s" % account.name assert resources, f"No resources found for account: {account.name}"
def test_myplex_connect_to_resource(plex, account): def test_myplex_connect_to_resource(plex, account):
@ -53,8 +53,8 @@ def test_myplex_devices(account):
for device in devices: for device in devices:
name = device.name or "Unknown" name = device.name or "Unknown"
connections = ", ".join(device.connections) if device.connections else "None" connections = ", ".join(device.connections) if device.connections else "None"
print("%s (%s): %s" % (name, device.product, connections)) print(f"{name} ({device.product}): {connections}")
assert devices, "No devices found for account: %s" % account.name assert devices, f"No devices found for account: {account.name}"
def test_myplex_device(account, plex): def test_myplex_device(account, plex):
@ -74,10 +74,10 @@ def test_myplex_users(account):
users = account.users() users = account.users()
if not len(users): if not len(users):
return pytest.skip("You have to add a shared account into your MyPlex") return pytest.skip("You have to add a shared account into your MyPlex")
print("Found %s users." % len(users)) print(f"Found {len(users)} users.")
user = account.user(users[0].title) user = account.user(users[0].title)
print("Found user: %s" % user) print(f"Found user: {user}")
assert user, "Could not find user %s" % users[0].title assert user, f"Could not find user {users[0].title}"
assert ( assert (
len(users[0].servers[0].sections()) > 0 len(users[0].servers[0].sections()) > 0
@ -219,9 +219,7 @@ def test_myplex_updateFriend(account, plex, mocker, shared_username):
def test_myplex_createExistingUser(account, plex, shared_username): def test_myplex_createExistingUser(account, plex, shared_username):
user = account.user(shared_username) user = account.user(shared_username)
url = "https://plex.tv/api/invites/requested/{}?friend=0&server=0&home=1".format( url = f"https://plex.tv/api/invites/requested/{user.id}?friend=0&server=0&home=1"
user.id
)
account.createExistingUser(user, plex) account.createExistingUser(user, plex)
assert shared_username in [u.username for u in account.users() if u.home is True] assert shared_username in [u.username for u in account.users() if u.home is True]

View file

@ -24,10 +24,10 @@ def test_navigate_around_artist(account, plex):
album = artist.album("Layers") album = artist.album("Layers")
tracks = artist.tracks() tracks = artist.tracks()
track = artist.track("As Colourful as Ever") track = artist.track("As Colourful as Ever")
print("Navigating around artist: %s" % artist) print(f"Navigating around artist: {artist}")
print("Album: %s" % album) print(f"Album: {album}")
print("Tracks: %s..." % tracks) print(f"Tracks: {tracks}...")
print("Track: %s" % track) print(f"Track: {track}")
assert artist.track("As Colourful as Ever") == track, "Unable to get artist track." assert artist.track("As Colourful as Ever") == track, "Unable to get artist track."
assert album.track("As Colourful as Ever") == track, "Unable to get album track." assert album.track("As Colourful as Ever") == track, "Unable to get album track."
assert album.artist() == artist, "album.artist() doesn't match expected artist." assert album.artist() == artist, "album.artist() doesn't match expected artist."

View file

@ -62,26 +62,26 @@ def test_Playlist_create(plex, show):
# Test add item # Test add item
playlist.addItems(episodes[3]) playlist.addItems(episodes[3])
items = playlist.reload().items() items = playlist.reload().items()
assert items[3].ratingKey == episodes[3].ratingKey, 'Missing added item: %s' % episodes[3] assert items[3].ratingKey == episodes[3].ratingKey, f'Missing added item: {episodes[3]}'
# Test add two items # Test add two items
playlist.addItems(episodes[4:6]) playlist.addItems(episodes[4:6])
items = playlist.reload().items() items = playlist.reload().items()
assert items[4].ratingKey == episodes[4].ratingKey, 'Missing added item: %s' % episodes[4] assert items[4].ratingKey == episodes[4].ratingKey, f'Missing added item: {episodes[4]}'
assert items[5].ratingKey == episodes[5].ratingKey, 'Missing added item: %s' % episodes[5] assert items[5].ratingKey == episodes[5].ratingKey, f'Missing added item: {episodes[5]}'
assert len(items) == 6, 'Playlist should have 6 items, %s found' % len(items) assert len(items) == 6, f'Playlist should have 6 items, {len(items)} found'
# Test remove item # Test remove item
toremove = items[5] toremove = items[5]
playlist.removeItems(toremove) playlist.removeItems(toremove)
items = playlist.reload().items() items = playlist.reload().items()
assert toremove not in items, 'Removed item still in playlist: %s' % items[5] assert toremove not in items, f'Removed item still in playlist: {items[5]}'
assert len(items) == 5, 'Playlist should have 5 items, %s found' % len(items) assert len(items) == 5, f'Playlist should have 5 items, {len(items)} found'
# Test remove two item # Test remove two item
toremove = items[3:5] toremove = items[3:5]
playlist.removeItems(toremove) playlist.removeItems(toremove)
items = playlist.reload().items() items = playlist.reload().items()
assert toremove[0] not in items, 'Removed item still in playlist: %s' % items[3] assert toremove[0] not in items, f'Removed item still in playlist: {items[3]}'
assert toremove[1] not in items, 'Removed item still in playlist: %s' % items[4] assert toremove[1] not in items, f'Removed item still in playlist: {items[4]}'
assert len(items) == 3, 'Playlist should have 5 items, %s found' % len(items) assert len(items) == 3, f'Playlist should have 5 items, {len(items)} found'
finally: finally:
playlist.delete() playlist.delete()

View file

@ -54,7 +54,7 @@ def test_create_playqueue(plex, show):
assert pq.playQueueLastAddedItemID == pq.items[2].playQueueItemID assert pq.playQueueLastAddedItemID == pq.items[2].playQueueItemID
assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey
assert pq.items[2].ratingKey == episodes[3].ratingKey, ( assert pq.items[2].ratingKey == episodes[3].ratingKey, (
"Missing added item: %s" % episodes[3] f"Missing added item: {episodes[3]}"
) )
# Test adding an item to play next # Test adding an item to play next
@ -62,7 +62,7 @@ def test_create_playqueue(plex, show):
assert pq.playQueueLastAddedItemID == pq.items[3].playQueueItemID assert pq.playQueueLastAddedItemID == pq.items[3].playQueueItemID
assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey
assert pq.items[1].ratingKey == episodes[4].ratingKey, ( assert pq.items[1].ratingKey == episodes[4].ratingKey, (
"Missing added item: %s" % episodes[4] f"Missing added item: {episodes[4]}"
) )
# Test add another item into Up Next section # Test add another item into Up Next section
@ -70,7 +70,7 @@ def test_create_playqueue(plex, show):
assert pq.playQueueLastAddedItemID == pq.items[4].playQueueItemID assert pq.playQueueLastAddedItemID == pq.items[4].playQueueItemID
assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey
assert pq.items[4].ratingKey == episodes[5].ratingKey, ( assert pq.items[4].ratingKey == episodes[5].ratingKey, (
"Missing added item: %s" % episodes[5] f"Missing added item: {episodes[5]}"
) )
# Test removing an item # Test removing an item
@ -78,20 +78,20 @@ def test_create_playqueue(plex, show):
pq.removeItem(toremove) pq.removeItem(toremove)
assert pq.playQueueLastAddedItemID == pq.items[3].playQueueItemID assert pq.playQueueLastAddedItemID == pq.items[3].playQueueItemID
assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey
assert toremove not in pq, "Removed item still in PlayQueue: %s" % toremove assert toremove not in pq, f"Removed item still in PlayQueue: {toremove}"
assert len(pq) == 5, "PlayQueue should have 5 items, %s found" % len(pq) assert len(pq) == 5, f"PlayQueue should have 5 items, {len(pq)} found"
# Test clearing the PlayQueue # Test clearing the PlayQueue
pq.clear() pq.clear()
assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey
assert len(pq) == 1, "PlayQueue should have 1 item, %s found" % len(pq) assert len(pq) == 1, f"PlayQueue should have 1 item, {len(pq)} found"
# Test adding an item again # Test adding an item again
pq.addItem(episodes[7]) pq.addItem(episodes[7])
assert pq.playQueueLastAddedItemID == pq.items[1].playQueueItemID assert pq.playQueueLastAddedItemID == pq.items[1].playQueueItemID
assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey assert pq.playQueueSelectedMetadataItemID == episodes[0].ratingKey
assert pq.items[1].ratingKey == episodes[7].ratingKey, ( assert pq.items[1].ratingKey == episodes[7].ratingKey, (
"Missing added item: %s" % episodes[7] f"Missing added item: {episodes[7]}"
) )

View file

@ -154,12 +154,12 @@ def test_server_search(plex, movie):
results = plex.search(genre.tag, mediatype="genre") results = plex.search(genre.tag, mediatype="genre")
hub_tag = results[0] hub_tag = results[0]
assert utils.is_int(hub_tag.count) assert utils.is_int(hub_tag.count)
assert hub_tag.filter == "genre={}".format(hub_tag.id) assert hub_tag.filter == f"genre={hub_tag.id}"
assert utils.is_int(hub_tag.id) assert utils.is_int(hub_tag.id)
assert utils.is_metadata( assert utils.is_metadata(
hub_tag.key, hub_tag.key,
prefix=hub_tag.librarySectionKey, prefix=hub_tag.librarySectionKey,
contains="{}/all".format(hub_tag.librarySectionID), contains=f"{hub_tag.librarySectionID}/all",
suffix=hub_tag.filter) suffix=hub_tag.filter)
assert utils.is_int(hub_tag.librarySectionID) assert utils.is_int(hub_tag.librarySectionID)
assert utils.is_metadata(hub_tag.librarySectionKey, prefix="/library/sections") assert utils.is_metadata(hub_tag.librarySectionKey, prefix="/library/sections")
@ -580,7 +580,7 @@ def test_server_PlexWebURL_playlists(plex):
assert url.startswith('https://app.plex.tv/desktop') assert url.startswith('https://app.plex.tv/desktop')
assert plex.machineIdentifier in url assert plex.machineIdentifier in url
assert 'source=playlists' in url assert 'source=playlists' in url
assert 'pivot=playlists.%s' % tab in url assert f'pivot=playlists.{tab}' in url
def test_server_agents(plex): def test_server_agents(plex):

View file

@ -26,7 +26,7 @@ def test_video_ne(movies):
assert ( assert (
len( len(
movies.fetchItems( movies.fetchItems(
"/library/sections/%s/all" % movies.key, title__ne="Sintel" f"/library/sections/{movies.key}/all", title__ne="Sintel"
) )
) )
== 3 == 3