mirror of
https://github.com/pkkid/python-plexapi
synced 2024-11-10 06:04:15 +00:00
MediaContainer
class is now a list (#1373)
* MediaContainer class is now a list - can now support totalSize as returned from server * add tests for media container * Update MediaContainer attributes if previously None when extending
This commit is contained in:
parent
5b5d4c66ab
commit
b836d24226
2 changed files with 109 additions and 4 deletions
|
@ -1,13 +1,21 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import re
|
import re
|
||||||
|
from typing import TYPE_CHECKING, Generic, Iterable, List, Optional, TypeVar, Union
|
||||||
import weakref
|
import weakref
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
from urllib.parse import urlencode
|
from urllib.parse import urlencode
|
||||||
from xml.etree import ElementTree
|
from xml.etree import ElementTree
|
||||||
|
from xml.etree.ElementTree import Element
|
||||||
|
|
||||||
from plexapi import CONFIG, X_PLEX_CONTAINER_SIZE, log, utils
|
from plexapi import CONFIG, X_PLEX_CONTAINER_SIZE, log, utils
|
||||||
from plexapi.exceptions import BadRequest, NotFound, UnknownType, Unsupported
|
from plexapi.exceptions import BadRequest, NotFound, UnknownType, Unsupported
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from plexapi.server import PlexServer
|
||||||
|
|
||||||
|
PlexObjectT = TypeVar("PlexObjectT", bound='PlexObject')
|
||||||
|
MediaContainerT = TypeVar("MediaContainerT", bound="MediaContainer")
|
||||||
|
|
||||||
USER_DONT_RELOAD_FOR_KEYS = set()
|
USER_DONT_RELOAD_FOR_KEYS = set()
|
||||||
_DONT_RELOAD_FOR_KEYS = {'key'}
|
_DONT_RELOAD_FOR_KEYS = {'key'}
|
||||||
OPERATORS = {
|
OPERATORS = {
|
||||||
|
@ -253,8 +261,7 @@ class PlexObject:
|
||||||
if maxresults is not None:
|
if maxresults is not None:
|
||||||
container_size = min(container_size, maxresults)
|
container_size = min(container_size, maxresults)
|
||||||
|
|
||||||
results = []
|
results = MediaContainer[cls](self._server, Element('MediaContainer'), initpath=ekey)
|
||||||
subresults = []
|
|
||||||
headers = {}
|
headers = {}
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
@ -332,7 +339,7 @@ class PlexObject:
|
||||||
if rtag:
|
if rtag:
|
||||||
data = next(utils.iterXMLBFS(data, rtag), [])
|
data = next(utils.iterXMLBFS(data, rtag), [])
|
||||||
# loop through all data elements to find matches
|
# loop through all data elements to find matches
|
||||||
items = []
|
items = MediaContainer[cls](self._server, data, initpath=initpath) if data.tag == 'MediaContainer' else []
|
||||||
for elem in data:
|
for elem in data:
|
||||||
if self._checkAttrs(elem, **kwargs):
|
if self._checkAttrs(elem, **kwargs):
|
||||||
item = self._buildItemOrNone(elem, cls, initpath)
|
item = self._buildItemOrNone(elem, cls, initpath)
|
||||||
|
@ -1011,7 +1018,11 @@ class PlexHistory(object):
|
||||||
return self._server.query(self.historyKey, method=self._server._session.delete)
|
return self._server.query(self.historyKey, method=self._server._session.delete)
|
||||||
|
|
||||||
|
|
||||||
class MediaContainer(PlexObject):
|
class MediaContainer(
|
||||||
|
Generic[PlexObjectT],
|
||||||
|
List[PlexObjectT],
|
||||||
|
PlexObject,
|
||||||
|
):
|
||||||
""" Represents a single MediaContainer.
|
""" Represents a single MediaContainer.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
|
@ -1024,11 +1035,71 @@ class MediaContainer(PlexObject):
|
||||||
librarySectionUUID (str): :class:`~plexapi.library.LibrarySection` UUID.
|
librarySectionUUID (str): :class:`~plexapi.library.LibrarySection` UUID.
|
||||||
mediaTagPrefix (str): "/system/bundle/media/flags/"
|
mediaTagPrefix (str): "/system/bundle/media/flags/"
|
||||||
mediaTagVersion (int): Unknown
|
mediaTagVersion (int): Unknown
|
||||||
|
offset (int): The offset of current results.
|
||||||
size (int): The number of items in the hub.
|
size (int): The number of items in the hub.
|
||||||
|
totalSize (int): The total number of items for the query.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
TAG = 'MediaContainer'
|
TAG = 'MediaContainer'
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
server: "PlexServer",
|
||||||
|
data: Element,
|
||||||
|
*args: PlexObjectT,
|
||||||
|
initpath: Optional[str] = None,
|
||||||
|
parent: Optional[PlexObject] = None,
|
||||||
|
) -> None:
|
||||||
|
# super calls Generic.__init__ which calls list.__init__ eventually
|
||||||
|
super().__init__(*args)
|
||||||
|
PlexObject.__init__(self, server, data, initpath, parent)
|
||||||
|
|
||||||
|
def extend(
|
||||||
|
self: MediaContainerT,
|
||||||
|
__iterable: Union[Iterable[PlexObjectT], MediaContainerT],
|
||||||
|
) -> None:
|
||||||
|
curr_size = self.size if self.size is not None else len(self)
|
||||||
|
super().extend(__iterable)
|
||||||
|
# update size, totalSize, and offset
|
||||||
|
if not isinstance(__iterable, MediaContainer):
|
||||||
|
return
|
||||||
|
|
||||||
|
# prefer the totalSize of the new iterable even if it is smaller
|
||||||
|
self.totalSize = (
|
||||||
|
__iterable.totalSize
|
||||||
|
if __iterable.totalSize is not None
|
||||||
|
else self.totalSize
|
||||||
|
) # ideally both should be equal
|
||||||
|
|
||||||
|
# the size of the new iterable is added to the current size
|
||||||
|
self.size = curr_size + (
|
||||||
|
__iterable.size if __iterable.size is not None else len(__iterable)
|
||||||
|
)
|
||||||
|
|
||||||
|
# the offset is the minimum of the two, prefering older values
|
||||||
|
if self.offset is not None and __iterable.offset is not None:
|
||||||
|
self.offset = min(self.offset, __iterable.offset)
|
||||||
|
else:
|
||||||
|
self.offset = (
|
||||||
|
self.offset if self.offset is not None else __iterable.offset
|
||||||
|
)
|
||||||
|
|
||||||
|
# for all other attributes, overwrite with the new iterable's values if previously None
|
||||||
|
for key in (
|
||||||
|
"allowSync",
|
||||||
|
"augmentationKey",
|
||||||
|
"identifier",
|
||||||
|
"librarySectionID",
|
||||||
|
"librarySectionTitle",
|
||||||
|
"librarySectionUUID",
|
||||||
|
"mediaTagPrefix",
|
||||||
|
"mediaTagVersion",
|
||||||
|
):
|
||||||
|
if (not hasattr(self, key)) or (getattr(self, key) is None):
|
||||||
|
if not hasattr(__iterable, key):
|
||||||
|
continue
|
||||||
|
setattr(self, key, getattr(__iterable, key))
|
||||||
|
|
||||||
def _loadData(self, data):
|
def _loadData(self, data):
|
||||||
self._data = data
|
self._data = data
|
||||||
self.allowSync = utils.cast(int, data.attrib.get('allowSync'))
|
self.allowSync = utils.cast(int, data.attrib.get('allowSync'))
|
||||||
|
@ -1039,4 +1110,6 @@ class MediaContainer(PlexObject):
|
||||||
self.librarySectionUUID = data.attrib.get('librarySectionUUID')
|
self.librarySectionUUID = data.attrib.get('librarySectionUUID')
|
||||||
self.mediaTagPrefix = data.attrib.get('mediaTagPrefix')
|
self.mediaTagPrefix = data.attrib.get('mediaTagPrefix')
|
||||||
self.mediaTagVersion = data.attrib.get('mediaTagVersion')
|
self.mediaTagVersion = data.attrib.get('mediaTagVersion')
|
||||||
|
self.offset = utils.cast(int, data.attrib.get("offset"))
|
||||||
self.size = utils.cast(int, data.attrib.get('size'))
|
self.size = utils.cast(int, data.attrib.get('size'))
|
||||||
|
self.totalSize = utils.cast(int, data.attrib.get("totalSize"))
|
||||||
|
|
32
tests/test_fetch_items.py
Normal file
32
tests/test_fetch_items.py
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
from plexapi.audio import Track
|
||||||
|
from plexapi.base import MediaContainer
|
||||||
|
|
||||||
|
|
||||||
|
def test_media_container_is_list():
|
||||||
|
container = MediaContainer(None, None, Track(None, None))
|
||||||
|
assert isinstance(container, list)
|
||||||
|
assert len(container) == 1
|
||||||
|
container.append(Track(None, None))
|
||||||
|
assert len(container) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_media_container_extend():
|
||||||
|
container_1 = MediaContainer(None, None, Track(None, None))
|
||||||
|
container_2 = MediaContainer(
|
||||||
|
None, None, [Track(None, None), Track(None, None)]
|
||||||
|
)
|
||||||
|
container_1.size, container_2.size = 1, 2
|
||||||
|
container_1.offset, container_2.offset = 3, 4
|
||||||
|
container_1.totalSize = container_2.totalSize = 10
|
||||||
|
container_1.extend(container_2)
|
||||||
|
assert container_1.size == 1 + 2
|
||||||
|
assert container_1.offset == min(3, 4)
|
||||||
|
assert container_1.totalSize == 10
|
||||||
|
|
||||||
|
|
||||||
|
def test_fetch_items_with_media_container(show):
|
||||||
|
all_episodes = show.episodes()
|
||||||
|
some_episodes = show.episodes(maxresults=2)
|
||||||
|
assert some_episodes.size == 2
|
||||||
|
assert some_episodes.offset == 0
|
||||||
|
assert some_episodes.totalSize == len(all_episodes)
|
Loading…
Reference in a new issue