Add 'activities' endpoint support (#569)

* Add /activities endpoint support

* Fix typos, return a list instead of iterator

* Canary test to validate CI behavior

* Increase timer of canary test to ensure proper run

* Move test to front of run, provide auth and anon versions

* Fix typo
This commit is contained in:
jjlawren 2020-09-16 18:16:01 -05:00 committed by GitHub
parent 24901302d7
commit 8410d81520
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 0 deletions

View file

@ -190,6 +190,14 @@ class PlexServer(PlexObject):
data = self.query(Account.key)
return Account(self, data)
@property
def activities(self):
"""Returns all current PMS activities."""
activities = []
for elem in self.query(Activity.key):
activities.append(Activity(self, elem))
return activities
def agents(self, mediaType=None):
""" Returns the `:class:`~plexapi.media.Agent` objects this server has available. """
key = '/system/agents'
@ -601,6 +609,20 @@ class Account(PlexObject):
self.subscriptionState = data.attrib.get('subscriptionState')
class Activity(PlexObject):
"""A currently running activity on the PlexServer."""
key = '/activities'
def _loadData(self, data):
self._data = data
self.cancellable = cast(bool, data.attrib.get('cancellable'))
self.progress = cast(int, data.attrib.get('progress'))
self.title = data.attrib.get('title')
self.subtitle = data.attrib.get('subtitle')
self.type = data.attrib.get('type')
self.uuid = data.attrib.get('uuid')
class SystemAccount(PlexObject):
""" Minimal api to list system accounts. """
key = '/accounts'

25
tests/test__prepare.py Normal file
View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
import time
import pytest
MAX_ATTEMPTS = 60
def wait_for_idle_server(server):
"""Wait for PMS activities to complete with a timeout."""
attempts = 0
while server.activities and attempts < MAX_ATTEMPTS:
print(f"Waiting for activities to finish: {server.activities}")
time.sleep(1)
attempts += 1
assert attempts < MAX_ATTEMPTS, f"Server still busy after {MAX_ATTEMPTS}s"
def test_ensure_metadata_scans_completed(plex):
wait_for_idle_server(plex)
@pytest.mark.authenticated
def test_ensure_metadata_scans_completed_authenticated(plex):
wait_for_idle_server(plex)