Merge pull request #470 from pkkid/static_media

first pass on static media
This commit is contained in:
Steffen Fredriksen 2020-05-01 15:42:59 +03:00 committed by GitHub
commit 64e935ce4b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1248 additions and 693 deletions

View file

@ -132,32 +132,22 @@ Usage Examples
Running tests over PlexAPI
--------------------------
In order to test the PlexAPI library you have to prepare a Plex Server instance with following libraries:
Use:
1. Movies section (agent `com.plexapp.agents.imdb`) containing both movies:
* Sintel - https://durian.blender.org/
* Elephants Dream - https://orange.blender.org/
* Sita Sings the Blues - http://www.sitasingstheblues.com/
* Big Buck Bunny - https://peach.blender.org/
2. TV Show section (agent `com.plexapp.agents.thetvdb`) containing the shows:
* Game of Thrones (Season 1 and 2)
* The 100 (Seasons 1 and 2)
* (or symlink the above movies with proper names)
3. Music section (agent `com.plexapp.agents.lastfm`) containing the albums:
* Infinite State - Unmastered Impulses - https://github.com/kennethreitz/unmastered-impulses
* Broke For Free - Layers - http://freemusicarchive.org/music/broke_for_free/Layers/
4. A Photos section (any agent) containing the photoalbums (photoalbum is just a folder on your disk):
* `Cats`
* Within `Cats` album you need to place 3 photos (cute cat photos, of course)
* Within `Cats` album you should place 3 more photoalbums (one of them should be named `Cats in bed`,
names of others doesn't matter)
* Within `Cats in bed` you need to place 7 photos
* Within other 2 albums you should place 1 photo in each
.. code-block:: bash
Instead of manual creation of the library you could use a script `tools/plex-boostraptest.py` with appropriate
tools/plex-boostraptest.py
with appropriate
arguments and add this new server to a shared user which username is defined in environment veriable `SHARED_USERNAME`.
It uses `official docker image`_ to create a proper instance.
For skipping the docker and reuse a existing server use
.. code-block:: bash
tools/plex-boostraptest.py --no-docker -username USERNAME --password PASSWORD --server-name NAME-OF-YOUR-SEVER
Also in order to run most of the tests you have to provide some environment variables:
* `PLEXAPI_AUTH_SERVER_BASEURL` containing an URL to your Plex instance, e.g. `http://127.0.0.1:32400` (without trailing

View file

@ -224,17 +224,17 @@ def collection(plex):
@pytest.fixture()
def artist(music):
return music.get("Infinite State")
return music.get("Broke For Free")
@pytest.fixture()
def album(artist):
return artist.album("Unmastered Impulses")
return artist.album("Layers")
@pytest.fixture()
def track(album):
return album.track("Holy Moment")
return album.track("As Colourful as Ever")
@pytest.fixture()

BIN
tests/data/audio_stub.mp3 Normal file

Binary file not shown.

BIN
tests/data/cute_cat.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

BIN
tests/data/video_stub.mp4 Normal file

Binary file not shown.

View file

@ -8,29 +8,29 @@ def test_audio_Artist_attr(artist):
artist.reload()
assert utils.is_datetime(artist.addedAt)
assert artist.countries == []
assert [i.tag for i in artist.genres] in [[], ['Electronic']]
assert "Electronic" in [i.tag for i in artist.genres]
assert utils.is_string(artist.guid, gte=5)
assert artist.index == '1'
assert artist.index == "1"
assert utils.is_metadata(artist._initpath)
assert utils.is_metadata(artist.key)
assert utils.is_int(artist.librarySectionID)
assert artist.listType == 'audio'
assert artist.listType == "audio"
assert len(artist.locations) == 1
assert len(artist.locations[0]) >= 10
assert artist.ratingKey >= 1
assert artist._server._baseurl == utils.SERVER_BASEURL
assert isinstance(artist.similar, list)
assert artist.summary == ''
assert artist.title == 'Infinite State'
assert artist.titleSort == 'Infinite State'
assert artist.type == 'artist'
assert "Alias" in artist.summary
assert artist.title == "Broke For Free"
assert artist.titleSort == "Broke For Free"
assert artist.type == "artist"
assert utils.is_datetime(artist.updatedAt)
assert utils.is_int(artist.viewCount, gte=0)
def test_audio_Artist_get(artist, music):
artist == music.searchArtists(**{'title': 'Infinite State'})[0]
artist.title == 'Infinite State'
artist == music.searchArtists(**{"title": "Broke For Free"})[0]
artist.title == "Broke For Free"
def test_audio_Artist_history(artist):
@ -39,50 +39,52 @@ def test_audio_Artist_history(artist):
def test_audio_Artist_track(artist):
track = artist.track('Holy Moment')
assert track.title == 'Holy Moment'
track = artist.track("As Colourful as Ever")
assert track.title == "As Colourful as Ever"
def test_audio_Artist_tracks(artist):
tracks = artist.tracks()
assert len(tracks) == 14
assert len(tracks) == 1
def test_audio_Artist_album(artist):
album = artist.album('Unmastered Impulses')
assert album.title == 'Unmastered Impulses'
album = artist.album("Layers")
assert album.title == "Layers"
def test_audio_Artist_albums(artist):
albums = artist.albums()
assert len(albums) == 1 and albums[0].title == 'Unmastered Impulses'
assert len(albums) == 1 and albums[0].title == "Layers"
def test_audio_Album_attrs(album):
assert utils.is_datetime(album.addedAt)
assert [i.tag for i in album.genres] == ['Electronic']
assert album.index == '1'
assert isinstance(album.genres, list)
assert album.index == "1"
assert utils.is_metadata(album._initpath)
assert utils.is_metadata(album.key)
assert utils.is_int(album.librarySectionID)
assert album.listType == 'audio'
assert album.originallyAvailableAt == datetime(2016, 1, 1)
assert album.listType == "audio"
if album.originallyAvailableAt:
assert utils.is_datetime(album.originallyAvailableAt)
assert utils.is_metadata(album.parentKey)
assert utils.is_int(album.parentRatingKey)
if album.parentThumb:
assert utils.is_metadata(album.parentThumb, contains='/thumb/')
assert album.parentTitle == 'Infinite State'
assert utils.is_metadata(album.parentThumb, contains="/thumb/")
assert album.parentTitle == "Broke For Free"
assert album.ratingKey >= 1
assert album._server._baseurl == utils.SERVER_BASEURL
assert album.studio is None
assert album.summary == ''
assert utils.is_metadata(album.thumb, contains='/thumb/')
assert album.title == 'Unmastered Impulses'
assert album.titleSort == 'Unmastered Impulses'
assert album.type == 'album'
assert album.summary == ""
if album.thumb:
assert utils.is_metadata(album.thumb, contains="/thumb/")
assert album.title == "Layers"
assert album.titleSort == "Layers"
assert album.type == "album"
assert utils.is_datetime(album.updatedAt)
assert utils.is_int(album.viewCount, gte=0)
assert album.year == 2016
assert album.year in (2012,)
assert album.artUrl is None
@ -99,29 +101,29 @@ def test_audio_Track_history(track):
def test_audio_Album_tracks(album):
tracks = album.tracks()
track = tracks[0]
assert len(tracks) == 14
assert len(tracks) == 1
assert utils.is_metadata(track.grandparentKey)
assert utils.is_int(track.grandparentRatingKey)
assert track.grandparentTitle == 'Infinite State'
assert track.index == '1'
assert track.grandparentTitle == "Broke For Free"
assert track.index == "1"
assert utils.is_metadata(track._initpath)
assert utils.is_metadata(track.key)
assert track.listType == 'audio'
assert track.originalTitle == 'Kenneth Reitz'
assert utils.is_int(track.parentIndex)
assert track.listType == "audio"
assert track.originalTitle in (None, "Broke For Free")
# assert utils.is_int(track.parentIndex)
assert utils.is_metadata(track.parentKey)
assert utils.is_int(track.parentRatingKey)
assert utils.is_metadata(track.parentThumb, contains='/thumb/')
assert track.parentTitle == 'Unmastered Impulses'
assert utils.is_metadata(track.parentThumb, contains="/thumb/")
assert track.parentTitle == "Layers"
# assert track.ratingCount == 9 # Flaky
assert utils.is_int(track.ratingKey)
assert track._server._baseurl == utils.SERVER_BASEURL
assert track.summary == ""
assert utils.is_metadata(track.thumb, contains='/thumb/')
assert track.title == 'Holy Moment'
assert track.titleSort == 'Holy Moment'
assert utils.is_metadata(track.thumb, contains="/thumb/")
assert track.title == "As Colourful as Ever"
assert track.titleSort == "As Colourful as Ever"
assert not track.transcodeSessions
assert track.type == 'track'
assert track.type == "track"
assert utils.is_datetime(track.updatedAt)
assert utils.is_int(track.viewCount, gte=0)
assert track.viewOffset == 0
@ -129,46 +131,47 @@ def test_audio_Album_tracks(album):
def test_audio_Album_track(album, track=None):
# this is not reloaded. its not that much info missing.
track = track or album.track('Holy Moment')
track = track or album.track("As Colourful As Ever")
assert utils.is_datetime(track.addedAt)
assert track.duration in [298605, 298606]
assert utils.is_int(track.duration)
assert utils.is_metadata(track.grandparentKey)
assert utils.is_int(track.grandparentRatingKey)
assert track.grandparentTitle == 'Infinite State'
assert track.grandparentTitle == "Broke For Free"
assert int(track.index) == 1
assert utils.is_metadata(track._initpath)
assert utils.is_metadata(track.key)
assert track.listType == 'audio'
assert track.listType == "audio"
# Assign 0 track.media
media = track.media[0]
assert track.originalTitle == 'Kenneth Reitz'
assert track.originalTitle in (None, "As Colourful As Ever")
# Fix me
assert utils.is_int(track.parentIndex)
assert utils.is_metadata(track.parentKey)
assert utils.is_int(track.parentRatingKey)
assert utils.is_metadata(track.parentThumb, contains='/thumb/')
assert track.parentTitle == 'Unmastered Impulses'
assert utils.is_metadata(track.parentThumb, contains="/thumb/")
assert track.parentTitle == "Layers"
# assert track.ratingCount == 9
assert utils.is_int(track.ratingKey)
assert track._server._baseurl == utils.SERVER_BASEURL
assert track.summary == ''
assert utils.is_metadata(track.thumb, contains='/thumb/')
assert track.title == 'Holy Moment'
assert track.titleSort == 'Holy Moment'
assert track.summary == ""
assert utils.is_metadata(track.thumb, contains="/thumb/")
assert track.title == "As Colourful as Ever"
assert track.titleSort == "As Colourful as Ever"
assert not track.transcodeSessions
assert track.type == 'track'
assert track.type == "track"
assert utils.is_datetime(track.updatedAt)
assert utils.is_int(track.viewCount, gte=0)
assert track.viewOffset == 0
assert media.aspectRatio is None
assert media.audioChannels == 2
assert media.audioCodec == 'mp3'
assert media.bitrate in [320, 385]
assert media.container == 'mp3'
assert media.duration in [298605, 298606]
assert media.height is None
assert media.audioCodec == "mp3"
assert media.bitrate == 128
assert media.container == "mp3"
assert utils.is_int(media.duration)
assert media.height in (None, 1080)
assert utils.is_int(media.id, gte=1)
assert utils.is_metadata(media._initpath)
assert media.optimizedForStreaming is None
assert media.optimizedForStreaming in (None, True)
# Assign 0 media.parts
part = media.parts[0]
assert media._server._baseurl == utils.SERVER_BASEURL
@ -176,69 +179,69 @@ def test_audio_Album_track(album, track=None):
assert media.videoFrameRate is None
assert media.videoResolution is None
assert media.width is None
assert part.container == 'mp3'
assert part.duration in [298605, 298606]
assert part.file.endswith('.mp3')
assert part.container == "mp3"
assert utils.is_int(part.duration)
assert part.file.endswith(".mp3")
assert utils.is_int(part.id)
assert utils.is_metadata(part._initpath)
assert utils.is_part(part.key)
assert part._server._baseurl == utils.SERVER_BASEURL
assert part.size == 14360402
assert part.size == 3761053
assert track.artUrl is None
def test_audio_Album_get(album):
# alias for album.track()
track = album.get('Holy Moment')
track = album.get("As Colourful As Ever")
test_audio_Album_track(album, track=track)
def test_audio_Album_artist(album):
artist = album.artist()
artist.title == 'Infinite State'
artist.title == "Broke For Free"
def test_audio_Track_attrs(album):
track = album.get('Holy Moment').reload()
track = album.get("As Colourful As Ever").reload()
assert utils.is_datetime(track.addedAt)
assert track.art is None
assert track.chapterSource is None
assert track.duration in [298605, 298606]
assert utils.is_int(track.duration)
assert track.grandparentArt is None
assert utils.is_metadata(track.grandparentKey)
assert utils.is_int(track.grandparentRatingKey)
if track.grandparentThumb:
assert utils.is_metadata(track.grandparentThumb, contains='/thumb/')
assert track.grandparentTitle == 'Infinite State'
assert track.guid.startswith('local://')
assert utils.is_metadata(track.grandparentThumb, contains="/thumb/")
assert track.grandparentTitle == "Broke For Free"
assert track.guid.startswith("local://")
assert int(track.index) == 1
assert utils.is_metadata(track._initpath)
assert utils.is_metadata(track.key)
if track.lastViewedAt:
assert utils.is_datetime(track.lastViewedAt)
assert utils.is_int(track.librarySectionID)
assert track.listType == 'audio'
assert track.listType == "audio"
# Assign 0 track.media
media = track.media[0]
assert track.moods == []
assert track.originalTitle == 'Kenneth Reitz'
assert track.originalTitle in (None, "Broke For Free")
assert int(track.parentIndex) == 1
assert utils.is_metadata(track.parentKey)
assert utils.is_int(track.parentRatingKey)
assert utils.is_metadata(track.parentThumb, contains='/thumb/')
assert track.parentTitle == 'Unmastered Impulses'
assert utils.is_metadata(track.parentThumb, contains="/thumb/")
assert track.parentTitle == "Layers"
assert track.playlistItemID is None
assert track.primaryExtraKey is None
#assert utils.is_int(track.ratingCount)
# assert utils.is_int(track.ratingCount)
assert utils.is_int(track.ratingKey)
assert track._server._baseurl == utils.SERVER_BASEURL
assert track.sessionKey is None
assert track.summary == ''
assert utils.is_metadata(track.thumb, contains='/thumb/')
assert track.title == 'Holy Moment'
assert track.titleSort == 'Holy Moment'
assert track.summary == ""
assert utils.is_metadata(track.thumb, contains="/thumb/")
assert track.title == "As Colourful as Ever"
assert track.titleSort == "As Colourful as Ever"
assert not track.transcodeSessions
assert track.type == 'track'
assert track.type == "track"
assert utils.is_datetime(track.updatedAt)
assert utils.is_int(track.viewCount, gte=0)
assert track.viewOffset == 0
@ -246,10 +249,10 @@ def test_audio_Track_attrs(album):
assert track.year is None
assert media.aspectRatio is None
assert media.audioChannels == 2
assert media.audioCodec == 'mp3'
assert media.bitrate in [320, 385]
assert media.container == 'mp3'
assert media.duration in [298605, 298606]
assert media.audioCodec == "mp3"
assert media.bitrate == 128
assert media.container == "mp3"
assert utils.is_int(media.duration)
assert media.height is None
assert utils.is_int(media.id, gte=1)
assert utils.is_metadata(media._initpath)
@ -261,23 +264,23 @@ def test_audio_Track_attrs(album):
assert media.videoFrameRate is None
assert media.videoResolution is None
assert media.width is None
assert part.container == 'mp3'
assert part.duration in [298605, 298606]
assert part.file.endswith('.mp3')
assert part.container == "mp3"
assert utils.is_int(part.duration)
assert part.file.endswith(".mp3")
assert utils.is_int(part.id)
assert utils.is_metadata(part._initpath)
assert utils.is_part(part.key)
# assert part.media == <Media:Holy.Moment>
assert part._server._baseurl == utils.SERVER_BASEURL
assert part.size == 14360402
assert part.size == 3761053
# Assign 0 part.streams
stream = part.streams[0]
assert stream.audioChannelLayout == 'stereo'
assert stream.audioChannelLayout == "stereo"
assert stream.bitDepth is None
assert stream.bitrate == 320
assert stream.bitrate == 128
assert stream.bitrateMode is None
assert stream.channels == 2
assert stream.codec == 'mp3'
assert stream.codec == "mp3"
assert stream.codecID is None
assert stream.dialogNorm is None
assert stream.duration is None
@ -287,7 +290,7 @@ def test_audio_Track_attrs(album):
assert stream.language is None
assert stream.languageCode is None
# assert stream.part == <MediaPart:22>
assert stream.samplingRate == 44100
assert stream.samplingRate == 48000
assert stream.selected is True
assert stream._server._baseurl == utils.SERVER_BASEURL
assert stream.streamType == 2
@ -319,13 +322,13 @@ def test_audio_Track_download(monkeydownload, tmpdir, track):
def test_audio_album_download(monkeydownload, album, tmpdir):
f = album.download(savepath=str(tmpdir))
assert len(f) == 14
assert len(f) == 1
def test_audio_Artist_download(monkeydownload, artist, tmpdir):
f = artist.download(savepath=str(tmpdir))
assert len(f) == 14
assert len(f) == 1
def test_audio_Album_label(album, patched_http_call):
album.addLabel('YO')
album.addLabel("YO")

View file

@ -8,7 +8,10 @@ def _check_capabilities(client, capabilities):
supported = client.protocolCapabilities
for capability in capabilities:
if capability not in supported:
pytest.skip("Client %s doesnt support %s capability support %s" % (client.title, capability, supported))
pytest.skip(
"Client %s doesnt support %s capability support %s"
% (client.title, capability, supported)
)
def _check_proxy(plex, client, proxy):
@ -123,7 +126,7 @@ def test_client_timeline(plex, client, movies, proxy):
try:
# Note: We noticed the isPlaying flag could take up to a full
# 30 seconds to be updated, hence the long sleeping.
mtype= "video"
mtype = "video"
client.stop(mtype)
assert client.isPlayingMedia() is False
print("client.playMedia(movie)")

View file

@ -193,11 +193,11 @@ def test_library_MusicSection_albums(music):
def test_library_MusicSection_searchTracks(music):
assert len(music.searchTracks(title="Holy Moment"))
assert len(music.searchTracks(title="As Colourful As Ever"))
def test_library_MusicSection_searchAlbums(music):
assert len(music.searchAlbums(title="Unmastered Impulses"))
assert len(music.searchAlbums(title="Layers"))
def test_library_PhotoSection_searchAlbums(photos, photoalbum):

View file

@ -1,25 +1,28 @@
# -*- coding: utf-8 -*-
import os
import pytest
import shlex
import subprocess
from os.path import abspath, dirname, join
SKIP_EXAMPLES = ['Example 4']
import pytest
SKIP_EXAMPLES = ["Example 4"]
@pytest.mark.skipif(os.name == 'nt', reason='No make.bat specified for Windows')
@pytest.mark.skipif(os.name == "nt", reason="No make.bat specified for Windows")
def test_build_documentation():
docroot = join(dirname(dirname(abspath(__file__))), 'docs')
cmd = shlex.split('sphinx-build -aE . _build')
proc = subprocess.Popen(cmd, cwd=docroot, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
docroot = join(dirname(dirname(abspath(__file__))), "docs")
cmd = shlex.split("sphinx-build -aE . _build")
proc = subprocess.Popen(
cmd, cwd=docroot, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
status = proc.wait()
assert status == 0
issues = []
for output in proc.communicate():
for line in str(output).split('\\n'):
for line in str(output).split("\\n"):
line = line.lower().strip()
if 'warning' in line or 'error' in line or 'traceback' in line:
if "warning" in line or "error" in line or "traceback" in line:
issues.append(line)
for line in issues:
print(line)
@ -29,30 +32,30 @@ def test_build_documentation():
def test_readme_examples(plex):
failed = 0
examples = _fetch_examples()
assert len(examples), 'No examples found in README'
assert len(examples), "No examples found in README"
for title, example in examples:
if _check_run_example(title):
try:
print('\n%s\n%s' % (title, '-' * len(title)))
exec('\n'.join(example))
print("\n%s\n%s" % (title, "-" * len(title)))
exec("\n".join(example))
except Exception as err:
failed += 1
print('Error running test: %s\nError: %s' % (title, err))
assert not failed, '%s examples raised an exception.' % failed
print("Error running test: %s\nError: %s" % (title, err))
assert not failed, "%s examples raised an exception." % failed
def _fetch_examples():
parsing = False
examples = []
filepath = join(dirname(dirname(abspath(__file__))), 'README.rst')
with open(filepath, 'r') as handle:
for line in handle.read().split('\n'):
filepath = join(dirname(dirname(abspath(__file__))), "README.rst")
with open(filepath, "r") as handle:
for line in handle.read().split("\n"):
line = line[4:]
if line.startswith('# Example '):
if line.startswith("# Example "):
parsing = True
title = line.lstrip('# ')
title = line.lstrip("# ")
examples.append([title, []])
elif parsing and line == '':
elif parsing and line == "":
parsing = False
elif parsing:
examples[-1][1].append(line)

View file

@ -1,40 +1,41 @@
# -*- coding: utf-8 -*-
import pytest
from plexapi.exceptions import BadRequest, NotFound
from . import conftest as utils
def test_myplex_accounts(account, plex):
assert account, 'Must specify username, password & resource to run this test.'
print('MyPlexAccount:')
print('username: %s' % account.username)
print('email: %s' % account.email)
print('home: %s' % account.home)
print('queueEmail: %s' % account.queueEmail)
assert account.username, 'Account has no username'
assert account.authenticationToken, 'Account has no authenticationToken'
assert account.email, 'Account has no email'
assert account.home is not None, 'Account has no home'
assert account.queueEmail, 'Account has no queueEmail'
assert account, "Must specify username, password & resource to run this test."
print("MyPlexAccount:")
print("username: %s" % account.username)
print("email: %s" % account.email)
print("home: %s" % account.home)
print("queueEmail: %s" % account.queueEmail)
assert account.username, "Account has no username"
assert account.authenticationToken, "Account has no authenticationToken"
assert account.email, "Account has no email"
assert account.home is not None, "Account has no home"
assert account.queueEmail, "Account has no queueEmail"
account = plex.account()
print('Local PlexServer.account():')
print('username: %s' % account.username)
#print('authToken: %s' % account.authToken)
print('signInState: %s' % account.signInState)
assert account.username, 'Account has no username'
assert account.authToken, 'Account has no authToken'
assert account.signInState, 'Account has no signInState'
print("Local PlexServer.account():")
print("username: %s" % account.username)
# print('authToken: %s' % account.authToken)
print("signInState: %s" % account.signInState)
assert account.username, "Account has no username"
assert account.authToken, "Account has no authToken"
assert account.signInState, "Account has no signInState"
def test_myplex_resources(account):
assert account, 'Must specify username, password & resource to run this test.'
assert account, "Must specify username, password & resource to run this test."
resources = account.resources()
for resource in resources:
name = resource.name or 'Unknown'
name = resource.name or "Unknown"
connections = [c.uri for c in resource.connections]
connections = ', '.join(connections) if connections else 'None'
print('%s (%s): %s' % (name, resource.product, connections))
assert resources, 'No resources found for account: %s' % account.name
connections = ", ".join(connections) if connections else "None"
print("%s (%s): %s" % (name, resource.product, connections))
assert resources, "No resources found for account: %s" % account.name
def test_myplex_connect_to_resource(plex, account):
@ -48,14 +49,15 @@ def test_myplex_connect_to_resource(plex, account):
def test_myplex_devices(account):
devices = account.devices()
for device in devices:
name = device.name or 'Unknown'
connections = ', '.join(device.connections) if device.connections else 'None'
print('%s (%s): %s' % (name, device.product, connections))
assert devices, 'No devices found for account: %s' % account.name
name = device.name or "Unknown"
connections = ", ".join(device.connections) if device.connections else "None"
print("%s (%s): %s" % (name, device.product, connections))
assert devices, "No devices found for account: %s" % account.name
def test_myplex_device(account, plex):
from plexapi import X_PLEX_DEVICE_NAME
assert account.device(plex.friendlyName)
assert account.device(X_PLEX_DEVICE_NAME)
@ -63,22 +65,24 @@ def test_myplex_device(account, plex):
def _test_myplex_connect_to_device(account):
devices = account.devices()
for device in devices:
if device.name == 'some client name' and len(device.connections):
if device.name == "some client name" and len(device.connections):
break
client = device.connect()
assert client, 'Unable to connect to device'
assert client, "Unable to connect to device"
def test_myplex_users(account):
users = account.users()
if not len(users):
return pytest.skip('You have to add a shared account into your MyPlex')
print('Found %s users.' % len(users))
return pytest.skip("You have to add a shared account into your MyPlex")
print("Found %s users." % len(users))
user = account.user(users[0].title)
print('Found user: %s' % user)
assert user, 'Could not find user %s' % users[0].title
print("Found user: %s" % user)
assert user, "Could not find user %s" % users[0].title
assert len(users[0].servers[0].sections()) > 0, "Couldn't info about the shared libraries"
assert (
len(users[0].servers[0].sections()) > 0
), "Couldn't info about the shared libraries"
def test_myplex_resource(account, plex):
@ -95,25 +99,25 @@ def test_myplex_webhooks(account):
def test_myplex_addwebhooks(account):
if account.subscriptionActive:
assert 'http://example.com' in account.addWebhook('http://example.com')
assert "http://example.com" in account.addWebhook("http://example.com")
else:
with pytest.raises(BadRequest):
account.addWebhook('http://example.com')
account.addWebhook("http://example.com")
def test_myplex_deletewebhooks(account):
if account.subscriptionActive:
assert 'http://example.com' not in account.deleteWebhook('http://example.com')
assert "http://example.com" not in account.deleteWebhook("http://example.com")
else:
with pytest.raises(BadRequest):
account.deleteWebhook('http://example.com')
account.deleteWebhook("http://example.com")
def test_myplex_optout(account_once):
def enabled():
ele = account_once.query('https://plex.tv/api/v2/user/privacy')
lib = ele.attrib.get('optOutLibraryStats')
play = ele.attrib.get('optOutPlayback')
ele = account_once.query("https://plex.tv/api/v2/user/privacy")
lib = ele.attrib.get("optOutLibraryStats")
play = ele.attrib.get("optOutPlayback")
return bool(int(lib)), bool(int(play))
account_once.optOut(library=True, playback=True)
@ -123,17 +127,25 @@ def test_myplex_optout(account_once):
def test_myplex_inviteFriend_remove(account, plex, mocker):
inv_user = 'hellowlol'
vid_filter = {'contentRating': ['G'], 'label': ['foo']}
inv_user = "hellowlol"
vid_filter = {"contentRating": ["G"], "label": ["foo"]}
secs = plex.library.sections()
ids = account._getSectionIds(plex.machineIdentifier, secs)
with mocker.patch.object(account, '_getSectionIds', return_value=ids):
with mocker.patch.object(account, "_getSectionIds", return_value=ids):
with utils.callable_http_patch():
account.inviteFriend(inv_user, plex, secs, allowSync=True, allowCameraUpload=True,
allowChannels=False, filterMovies=vid_filter, filterTelevision=vid_filter,
filterMusic={'label': ['foo']})
account.inviteFriend(
inv_user,
plex,
secs,
allowSync=True,
allowCameraUpload=True,
allowChannels=False,
filterMovies=vid_filter,
filterTelevision=vid_filter,
filterMusic={"label": ["foo"]},
)
assert inv_user not in [u.title for u in account.users()]
@ -143,51 +155,68 @@ def test_myplex_inviteFriend_remove(account, plex, mocker):
def test_myplex_updateFriend(account, plex, mocker, shared_username):
vid_filter = {'contentRating': ['G'], 'label': ['foo']}
vid_filter = {"contentRating": ["G"], "label": ["foo"]}
secs = plex.library.sections()
user = account.user(shared_username)
ids = account._getSectionIds(plex.machineIdentifier, secs)
with mocker.patch.object(account, '_getSectionIds', return_value=ids):
with mocker.patch.object(account, 'user', return_value=user):
with mocker.patch.object(account, "_getSectionIds", return_value=ids):
with mocker.patch.object(account, "user", return_value=user):
with utils.callable_http_patch():
account.updateFriend(shared_username, plex, secs, allowSync=True, removeSections=True,
allowCameraUpload=True, allowChannels=False, filterMovies=vid_filter,
filterTelevision=vid_filter, filterMusic={'label': ['foo']})
account.updateFriend(
shared_username,
plex,
secs,
allowSync=True,
removeSections=True,
allowCameraUpload=True,
allowChannels=False,
filterMovies=vid_filter,
filterTelevision=vid_filter,
filterMusic={"label": ["foo"]},
)
def test_myplex_createExistingUser(account, plex, shared_username):
user = account.user(shared_username)
url = 'https://plex.tv/api/invites/requested/{}?friend=0&server=0&home=1'.format(user.id)
url = "https://plex.tv/api/invites/requested/{}?friend=0&server=0&home=1".format(
user.id
)
account.createExistingUser(user, plex)
assert shared_username in [u.username for u in account.users() if u.home is True]
# Remove Home invite
account.query(url, account._session.delete)
# Confirm user was removed from home and has returned to friend
assert shared_username not in [u.username for u in plex.myPlexAccount().users() if u.home is True]
assert shared_username in [u.username for u in plex.myPlexAccount().users() if u.home is False]
assert shared_username not in [
u.username for u in plex.myPlexAccount().users() if u.home is True
]
assert shared_username in [
u.username for u in plex.myPlexAccount().users() if u.home is False
]
@pytest.mark.skip(reason="broken test?")
def test_myplex_createHomeUser_remove(account, plex):
homeuser = 'New Home User'
homeuser = "New Home User"
account.createHomeUser(homeuser, plex)
assert homeuser in [u.title for u in plex.myPlexAccount().users() if u.home is True]
account.removeHomeUser(homeuser)
assert homeuser not in [u.title for u in plex.myPlexAccount().users() if u.home is True]
assert homeuser not in [
u.title for u in plex.myPlexAccount().users() if u.home is True
]
def test_myplex_plexpass_attributes(account_plexpass):
assert account_plexpass.subscriptionActive
assert account_plexpass.subscriptionStatus == 'Active'
assert account_plexpass.subscriptionStatus == "Active"
assert account_plexpass.subscriptionPlan
assert 'sync' in account_plexpass.subscriptionFeatures
assert 'premium_music_metadata' in account_plexpass.subscriptionFeatures
assert 'plexpass' in account_plexpass.roles
assert "sync" in account_plexpass.subscriptionFeatures
assert "premium_music_metadata" in account_plexpass.subscriptionFeatures
assert "plexpass" in account_plexpass.roles
assert set(account_plexpass.entitlements) == utils.ENTITLEMENTS
def test_myplex_claimToken(account):
assert account.claimToken().startswith('claim-')
assert account.claimToken().startswith("claim-")

View file

@ -2,37 +2,33 @@
def test_navigate_around_show(account, plex):
show = plex.library.section('TV Shows').get('The 100')
show = plex.library.section("TV Shows").get("The 100")
seasons = show.seasons()
season = show.season('Season 1')
season = show.season("Season 1")
episodes = show.episodes()
episode = show.episode('Pilot')
assert 'Season 1' in [s.title for s in seasons], 'Unable to list season:'
assert 'Pilot' in [e.title for e in episodes], 'Unable to list episode:'
episode = show.episode("Pilot")
assert "Season 1" in [s.title for s in seasons], "Unable to list season:"
assert "Pilot" in [e.title for e in episodes], "Unable to list episode:"
assert show.season(1) == season
assert show.episode('Pilot') == episode, 'Unable to get show episode:'
assert season.episode('Pilot') == episode, 'Unable to get season episode:'
assert season.show() == show, 'season.show() doesnt match expected show.'
assert episode.show() == show, 'episode.show() doesnt match expected show.'
assert episode.season() == season, 'episode.season() doesnt match expected season.'
assert show.episode("Pilot") == episode, "Unable to get show episode:"
assert season.episode("Pilot") == episode, "Unable to get season episode:"
assert season.show() == show, "season.show() doesnt match expected show."
assert episode.show() == show, "episode.show() doesnt match expected show."
assert episode.season() == season, "episode.season() doesnt match expected season."
def test_navigate_around_artist(account, plex):
artist = plex.library.section('Music').get('Infinite State')
artist = plex.library.section("Music").get("Broke For Free")
albums = artist.albums()
album = artist.album('Unmastered Impulses')
album = artist.album("Layers")
tracks = artist.tracks()
track = artist.track('Mantra')
print('Navigating around artist: %s' % artist)
print('Albums: %s...' % albums[:3])
print('Album: %s' % album)
print('Tracks: %s...' % tracks[:3])
print('Track: %s' % track)
assert 'Unmastered Impulses' in [a.title for a in albums], 'Unable to list album.'
assert 'Mantra' in [e.title for e in tracks], 'Unable to list track.'
assert artist.album('Unmastered Impulses') == album, 'Unable to get artist album.'
assert artist.track('Mantra') == track, 'Unable to get artist track.'
assert album.track('Mantra') == track, 'Unable to get album track.'
assert album.artist() == artist, 'album.artist() doesnt match expected artist.'
assert track.artist() == artist, 'track.artist() doesnt match expected artist.'
assert track.album() == album, 'track.album() doesnt match expected album.'
track = artist.track("As Colourful as Ever")
print("Navigating around artist: %s" % artist)
print("Album: %s" % album)
print("Tracks: %s..." % tracks)
print("Track: %s" % track)
assert artist.track("As Colourful as Ever") == track, "Unable to get artist track."
assert album.track("As Colourful as Ever") == track, "Unable to get album track."
assert album.artist() == artist, "album.artist() doesnt match expected artist."
assert track.artist() == artist, "track.artist() doesnt match expected artist."
assert track.album() == album, "track.album() doesnt match expected album."

View file

@ -1,9 +1,7 @@
def test_photo_Photoalbum(photoalbum):
assert len(photoalbum.albums()) == 3
assert len(photoalbum.photos()) == 3
cats_in_bed = photoalbum.album('Cats in bed')
cats_in_bed = photoalbum.album("Cats in bed")
assert len(cats_in_bed.photos()) == 7
a_pic = cats_in_bed.photo('photo7')
a_pic = cats_in_bed.photo("photo7")
assert a_pic

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import time
import pytest

View file

@ -19,11 +19,11 @@ def test_server_attr(plex, account):
assert len(plex.machineIdentifier) == 40
assert plex.myPlex is True
# if you run the tests very shortly after server creation the state in rare cases may be `unknown`
assert plex.myPlexMappingState in ('mapped', 'unknown')
assert plex.myPlexSigninState == 'ok'
assert plex.myPlexMappingState in ("mapped", "unknown")
assert plex.myPlexSigninState == "ok"
assert utils.is_int(plex.myPlexSubscription, gte=0)
assert re.match(utils.REGEX_EMAIL, plex.myPlexUsername)
assert plex.platform in ('Linux', 'Windows')
assert plex.platform in ("Linux", "Windows")
assert len(plex.platformVersion) >= 5
assert plex._token == account.authenticationToken
assert utils.is_int(plex.transcoderActiveVideoSessions, gte=0)
@ -54,28 +54,37 @@ def test_server_library(plex):
def test_server_url(plex):
assert 'ohno' in plex.url('ohno')
assert "ohno" in plex.url("ohno")
def test_server_transcodeImage(tmpdir, plex, show):
width, height = 500, 500
imgurl = plex.transcodeImage(show.banner, height, width)
gray = imgurl = plex.transcodeImage(show.banner, height, width, saturation=0)
resized_img = download(imgurl, plex._token, savepath=str(tmpdir), filename='resize_image')
original_img = download(show._server.url(show.banner), plex._token, savepath=str(tmpdir), filename='original_img')
grayscale_img = download(gray, plex._token, savepath=str(tmpdir), filename='grayscale_img')
resized_img = download(
imgurl, plex._token, savepath=str(tmpdir), filename="resize_image"
)
original_img = download(
show._server.url(show.banner),
plex._token,
savepath=str(tmpdir),
filename="original_img",
)
grayscale_img = download(
gray, plex._token, savepath=str(tmpdir), filename="grayscale_img"
)
with Image.open(resized_img) as image:
assert width, height == image.size
with Image.open(original_img) as image:
assert width, height != image.size
assert _detect_color_image(grayscale_img, thumb_size=150) == 'grayscale'
assert _detect_color_image(grayscale_img, thumb_size=150) == "grayscale"
def _detect_color_image(file, thumb_size=150, MSE_cutoff=22, adjust_color_bias=True):
# http://stackoverflow.com/questions/20068945/detect-if-image-is-color-grayscale-or-black-and-white-with-python-pil
pilimg = Image.open(file)
bands = pilimg.getbands()
if bands == ('R', 'G', 'B') or bands == ('R', 'G', 'B', 'A'):
if bands == ("R", "G", "B") or bands == ("R", "G", "B", "A"):
thumb = pilimg.resize((thumb_size, thumb_size))
sse, bias = 0, [0, 0, 0]
if adjust_color_bias:
@ -83,11 +92,13 @@ def _detect_color_image(file, thumb_size=150, MSE_cutoff=22, adjust_color_bias=T
bias = [b - sum(bias) / 3 for b in bias]
for pixel in thumb.getdata():
mu = sum(pixel) / 3
sse += sum((pixel[i] - mu - bias[i]) * (pixel[i] - mu - bias[i]) for i in [0, 1, 2])
sse += sum(
(pixel[i] - mu - bias[i]) * (pixel[i] - mu - bias[i]) for i in [0, 1, 2]
)
mse = float(sse) / (thumb_size * thumb_size)
return 'grayscale' if mse <= MSE_cutoff else 'color'
return "grayscale" if mse <= MSE_cutoff else "color"
elif len(bands) == 1:
return 'blackandwhite'
return "blackandwhite"
def test_server_fetchitem_notfound(plex):
@ -99,16 +110,16 @@ def test_server_search(plex, movie):
title = movie.title
# this search seem to fail on my computer but not at travis, wtf.
assert plex.search(title)
assert plex.search(title, mediatype='movie')
assert plex.search(title, mediatype="movie")
def test_server_playlist(plex, show):
episodes = show.episodes()
playlist = plex.createPlaylist('test_playlist', episodes[:3])
playlist = plex.createPlaylist("test_playlist", episodes[:3])
try:
assert playlist.title == 'test_playlist'
assert playlist.title == "test_playlist"
with pytest.raises(NotFound):
plex.playlist('<playlist-not-found>')
plex.playlist("<playlist-not-found>")
finally:
playlist.delete()
@ -117,7 +128,7 @@ def test_server_playlists(plex, show):
playlists = plex.playlists()
count = len(playlists)
episodes = show.episodes()
playlist = plex.createPlaylist('test_playlist', episodes[:3])
playlist = plex.createPlaylist("test_playlist", episodes[:3])
try:
playlists = plex.playlists()
assert len(playlists) == count + 1
@ -133,9 +144,9 @@ def test_server_history(plex, movie):
def test_server_Server_query(plex):
assert plex.query('/')
assert plex.query("/")
with pytest.raises(NotFound):
assert plex.query('/asdf/1234/asdf', headers={'random_headers': '1234'})
assert plex.query("/asdf/1234/asdf", headers={"random_headers": "1234"})
def test_server_Server_session(account):
@ -144,28 +155,31 @@ def test_server_Server_session(account):
def __init__(self):
super(self.__class__, self).__init__()
self.plexapi_session_test = True
# Test Code
plex = PlexServer(utils.SERVER_BASEURL, account.authenticationToken, session=MySession())
assert hasattr(plex._session, 'plexapi_session_test')
plex = PlexServer(
utils.SERVER_BASEURL, account.authenticationToken, session=MySession()
)
assert hasattr(plex._session, "plexapi_session_test")
@pytest.mark.authenticated
def test_server_token_in_headers(plex):
headers = plex._headers()
assert 'X-Plex-Token' in headers
assert len(headers['X-Plex-Token']) >= 1
assert "X-Plex-Token" in headers
assert len(headers["X-Plex-Token"]) >= 1
def test_server_createPlayQueue(plex, movie):
playqueue = plex.createPlayQueue(movie, shuffle=1, repeat=1)
assert 'shuffle=1' in playqueue._initpath
assert 'repeat=1' in playqueue._initpath
assert "shuffle=1" in playqueue._initpath
assert "repeat=1" in playqueue._initpath
assert playqueue.playQueueShuffled is True
def test_server_client_not_found(plex):
with pytest.raises(NotFound):
plex.client('<This-client-should-not-be-found>')
plex.client("<This-client-should-not-be-found>")
def test_server_sessions(plex):
@ -174,38 +188,41 @@ def test_server_sessions(plex):
def test_server_isLatest(plex, mocker):
from os import environ
is_latest = plex.isLatest()
if environ.get('PLEX_CONTAINER_TAG') and environ['PLEX_CONTAINER_TAG'] != 'latest':
if environ.get("PLEX_CONTAINER_TAG") and environ["PLEX_CONTAINER_TAG"] != "latest":
assert not is_latest
else:
return pytest.skip('Do not forget to run with PLEX_CONTAINER_TAG != latest to ensure that update is available')
return pytest.skip(
"Do not forget to run with PLEX_CONTAINER_TAG != latest to ensure that update is available"
)
def test_server_installUpdate(plex, mocker):
m = mocker.MagicMock(release='aa')
with patch('plexapi.server.PlexServer.check_for_update', return_value=m):
m = mocker.MagicMock(release="aa")
with patch("plexapi.server.PlexServer.check_for_update", return_value=m):
with utils.callable_http_patch():
plex.installUpdate()
def test_server_check_for_update(plex, mocker):
class R():
class R:
def __init__(self, **kwargs):
self.download_key = 'plex.tv/release/1337'
self.version = '1337'
self.added = 'gpu transcode'
self.fixed = 'fixed rare bug'
self.downloadURL = 'http://path-to-update'
self.state = 'downloaded'
self.download_key = "plex.tv/release/1337"
self.version = "1337"
self.added = "gpu transcode"
self.fixed = "fixed rare bug"
self.downloadURL = "http://path-to-update"
self.state = "downloaded"
with patch('plexapi.server.PlexServer.check_for_update', return_value=R()):
with patch("plexapi.server.PlexServer.check_for_update", return_value=R()):
rel = plex.check_for_update(force=False, download=True)
assert rel.download_key == 'plex.tv/release/1337'
assert rel.version == '1337'
assert rel.added == 'gpu transcode'
assert rel.fixed == 'fixed rare bug'
assert rel.downloadURL == 'http://path-to-update'
assert rel.state == 'downloaded'
assert rel.download_key == "plex.tv/release/1337"
assert rel.version == "1337"
assert rel.added == "gpu transcode"
assert rel.fixed == "fixed rare bug"
assert rel.downloadURL == "http://path-to-update"
assert rel.state == "downloaded"
@pytest.mark.client
@ -221,6 +238,7 @@ def test_server_clients(plex):
assert set(client.protocolCapabilities).issubset({'timeline', 'playback', 'navigation', 'mirror', 'playqueues'})
@pytest.mark.authenticated
def test_server_account(plex):
account = plex.account()
@ -228,25 +246,29 @@ def test_server_account(plex):
# TODO: Figure out why this is missing from time to time.
# assert account.mappingError == 'publisherror'
assert account.mappingErrorMessage is None
assert account.mappingState == 'mapped'
if account.mappingError != 'unreachable':
assert account.mappingState == "mapped"
if account.mappingError != "unreachable":
assert re.match(utils.REGEX_IPADDR, account.privateAddress)
assert int(account.privatePort) >= 1000
assert re.match(utils.REGEX_IPADDR, account.publicAddress)
assert int(account.publicPort) >= 1000
else:
assert account.privateAddress == ''
assert account.privateAddress == ""
assert int(account.privatePort) == 0
assert account.publicAddress == ''
assert account.publicAddress == ""
assert int(account.publicPort) == 0
assert account.signInState == 'ok'
assert account.signInState == "ok"
assert isinstance(account.subscriptionActive, bool)
if account.subscriptionActive:
assert len(account.subscriptionFeatures)
# Below check keeps failing.. it should go away.
# else: assert sorted(account.subscriptionFeatures) == ['adaptive_bitrate',
# 'download_certificates', 'federated-auth', 'news']
assert account.subscriptionState == 'Active' if account.subscriptionActive else 'Unknown'
assert (
account.subscriptionState == "Active"
if account.subscriptionActive
else "Unknown"
)
assert re.match(utils.REGEX_EMAIL, account.username)

View file

@ -1,31 +1,30 @@
def test_settings_group(plex):
assert plex.settings.group('general')
assert plex.settings.group("general")
def test_settings_get(plex):
# This is the value since it we havnt set any friendlyname
# plex just default to computer name but it NOT in the settings.
# check this one. why is this bytes instead of string.
value = plex.settings.get('FriendlyName').value
value = plex.settings.get("FriendlyName").value
# Should not be bytes, fix this when py2 is dropped
assert isinstance(value, bytes)
def test_settings_set(plex):
cd = plex.settings.get('autoEmptyTrash')
cd = plex.settings.get("autoEmptyTrash")
old_value = cd.value
new_value = not old_value
cd.set(new_value)
plex.settings.save()
plex._settings = None
assert plex.settings.get('autoEmptyTrash').value == new_value
assert plex.settings.get("autoEmptyTrash").value == new_value
def test_settings_set_str(plex):
cd = plex.settings.get('OnDeckWindow')
cd = plex.settings.get("OnDeckWindow")
new_value = 99
cd.set(new_value)
plex.settings.save()
plex._settings = None
assert plex.settings.get('OnDeckWindow').value == 99
assert plex.settings.get("OnDeckWindow").value == 99

View file

@ -1,7 +1,8 @@
from plexapi.exceptions import BadRequest
from . import conftest as utils
from plexapi.sync import (AUDIO_BITRATE_192_KBPS, PHOTO_QUALITY_MEDIUM,
VIDEO_QUALITY_3_MBPS_720p)
from plexapi.sync import VIDEO_QUALITY_3_MBPS_720p, AUDIO_BITRATE_192_KBPS, PHOTO_QUALITY_MEDIUM
from . import conftest as utils
def get_sync_item_from_server(device, sync_item):
@ -16,14 +17,14 @@ def is_sync_item_missing(device, sync_item):
def test_current_device_got_sync_target(clear_sync_device):
assert 'sync-target' in clear_sync_device.provides
assert "sync-target" in clear_sync_device.provides
def get_media(item, server):
try:
return item.getMedia()
except BadRequest as e:
if 'not_found' in str(e):
if "not_found" in str(e):
server.refreshSync()
return None
else:
@ -33,9 +34,16 @@ def get_media(item, server):
def test_add_movie_to_sync(clear_sync_device, movie):
new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
movie._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=movie._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=movie._server
)
assert len(media_list) == 1
assert media_list[0].ratingKey == movie.ratingKey
@ -43,33 +51,58 @@ def test_add_movie_to_sync(clear_sync_device, movie):
def test_delete_sync_item(clear_sync_device, movie):
new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
movie._server.refreshSync()
new_item_in_myplex = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
new_item_in_myplex = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
sync_items = clear_sync_device.syncItems()
for item in sync_items.items:
item.delete()
utils.wait_until(is_sync_item_missing, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item_in_myplex)
utils.wait_until(
is_sync_item_missing,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item_in_myplex,
)
def test_add_show_to_sync(clear_sync_device, show):
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
show._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
episodes = show.episodes()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
def test_add_season_to_sync(clear_sync_device, show):
season = show.season('Season 1')
season = show.season("Season 1")
new_item = season.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
season._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
episodes = season.episodes()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=season._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=season._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -77,80 +110,131 @@ def test_add_season_to_sync(clear_sync_device, show):
def test_add_episode_to_sync(clear_sync_device, episode):
new_item = episode.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
episode._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=episode._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=episode._server
)
assert 1 == len(media_list)
assert episode.ratingKey == media_list[0].ratingKey
def test_limited_watched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=False)
new_item = show.sync(
VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=False
)
show._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
episodes = show.episodes()[:5]
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert 5 == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert 5 == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
def test_limited_unwatched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=True)
new_item = show.sync(
VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=True
)
show._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
episodes = show.episodes(viewCount=0)[:5]
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
episodes = show.episodes(viewCount=0)[:5]
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
def test_unlimited_and_watched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=False)
new_item = show.sync(
VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=False
)
show._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
episodes = show.episodes()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
episodes = show.episodes()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
def test_unlimited_and_unwatched(clear_sync_device, show):
show.markUnwatched()
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=True)
new_item = show.sync(
VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=True
)
show._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
episodes = show.episodes(viewCount=0)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
episodes[0].markWatched()
show._server.refreshSync()
episodes = show.episodes(viewCount=0)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=show._server
)
assert len(episodes) == len(media_list)
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
@ -158,10 +242,17 @@ def test_unlimited_and_unwatched(clear_sync_device, show):
def test_add_music_artist_to_sync(clear_sync_device, artist):
new_item = artist.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
artist._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
tracks = artist.tracks()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=artist._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=artist._server
)
assert len(tracks) == len(media_list)
assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list]
@ -169,10 +260,17 @@ def test_add_music_artist_to_sync(clear_sync_device, artist):
def test_add_music_album_to_sync(clear_sync_device, album):
new_item = album.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
album._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
tracks = album.tracks()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=album._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=album._server
)
assert len(tracks) == len(media_list)
assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list]
@ -180,20 +278,34 @@ def test_add_music_album_to_sync(clear_sync_device, album):
def test_add_music_track_to_sync(clear_sync_device, track):
new_item = track.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
track._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=track._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=track._server
)
assert 1 == len(media_list)
assert track.ratingKey == media_list[0].ratingKey
def test_add_photo_to_sync(clear_sync_device, photoalbum):
photo = photoalbum.photo('photo1')
photo = photoalbum.photo("photo1")
new_item = photo.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
photo._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=photo._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=photo._server
)
assert 1 == len(media_list)
assert photo.ratingKey == media_list[0].ratingKey
@ -201,10 +313,17 @@ def test_add_photo_to_sync(clear_sync_device, photoalbum):
def test_sync_entire_library_movies(clear_sync_device, movies):
new_item = movies.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
movies._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
section_content = movies.all()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=movies._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=movies._server
)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -212,10 +331,17 @@ def test_sync_entire_library_movies(clear_sync_device, movies):
def test_sync_entire_library_tvshows(clear_sync_device, tvshows):
new_item = tvshows.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
tvshows._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
section_content = tvshows.searchEpisodes()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=tvshows._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=tvshows._server
)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -223,10 +349,17 @@ def test_sync_entire_library_tvshows(clear_sync_device, tvshows):
def test_sync_entire_library_music(clear_sync_device, music):
new_item = music.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
music._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
section_content = music.searchTracks()
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=music._server)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=music._server
)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
@ -234,23 +367,39 @@ def test_sync_entire_library_music(clear_sync_device, music):
def test_sync_entire_library_photos(clear_sync_device, photos):
new_item = photos.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
photos._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
# It's not that easy, to just get all the photos within the library, so let`s query for photos with device!=0x0
section_content = photos.search(libtype='photo', **{'device!': '0x0'})
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=photos._server)
section_content = photos.search(libtype="photo", **{"device!": "0x0"})
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=photos._server
)
assert len(section_content) == len(media_list)
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
def test_playlist_movie_sync(plex, clear_sync_device, movies):
items = movies.all()
playlist = plex.createPlaylist('Sync: Movies', items)
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
playlist = plex.createPlaylist("Sync: Movies", items)
new_item = playlist.sync(
videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device
)
playlist._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=playlist._server
)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -258,12 +407,21 @@ def test_playlist_movie_sync(plex, clear_sync_device, movies):
def test_playlist_tvshow_sync(plex, clear_sync_device, show):
items = show.episodes()
playlist = plex.createPlaylist('Sync: TV Show', items)
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
playlist = plex.createPlaylist("Sync: TV Show", items)
new_item = playlist.sync(
videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device
)
playlist._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=playlist._server
)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -271,12 +429,21 @@ def test_playlist_tvshow_sync(plex, clear_sync_device, show):
def test_playlist_mixed_sync(plex, clear_sync_device, movie, episode):
items = [movie, episode]
playlist = plex.createPlaylist('Sync: Mixed', items)
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
playlist = plex.createPlaylist("Sync: Mixed", items)
new_item = playlist.sync(
videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device
)
playlist._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=playlist._server
)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -284,12 +451,21 @@ def test_playlist_mixed_sync(plex, clear_sync_device, movie, episode):
def test_playlist_music_sync(plex, clear_sync_device, artist):
items = artist.tracks()
playlist = plex.createPlaylist('Sync: Music', items)
new_item = playlist.sync(audioBitrate=AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
playlist = plex.createPlaylist("Sync: Music", items)
new_item = playlist.sync(
audioBitrate=AUDIO_BITRATE_192_KBPS, client=clear_sync_device
)
playlist._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=playlist._server
)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()
@ -297,12 +473,21 @@ def test_playlist_music_sync(plex, clear_sync_device, artist):
def test_playlist_photos_sync(plex, clear_sync_device, photoalbum):
items = photoalbum.photos()
playlist = plex.createPlaylist('Sync: Photos', items)
new_item = playlist.sync(photoResolution=PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
playlist = plex.createPlaylist("Sync: Photos", items)
new_item = playlist.sync(
photoResolution=PHOTO_QUALITY_MEDIUM, client=clear_sync_device
)
playlist._server.refreshSync()
item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device,
sync_item=new_item)
media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server)
item = utils.wait_until(
get_sync_item_from_server,
delay=0.5,
timeout=3,
device=clear_sync_device,
sync_item=new_item,
)
media_list = utils.wait_until(
get_media, delay=0.25, timeout=3, item=item, server=playlist._server
)
assert len(items) == len(media_list)
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
playlist.delete()

View file

@ -7,14 +7,17 @@ from plexapi.exceptions import NotFound
def test_utils_toDatetime():
assert str(utils.toDatetime('2006-03-03', format='%Y-%m-%d')) == '2006-03-03 00:00:00'
#assert str(utils.toDatetime('0'))[:-9] in ['1970-01-01', '1969-12-31']
assert (
str(utils.toDatetime("2006-03-03", format="%Y-%m-%d")) == "2006-03-03 00:00:00"
)
# assert str(utils.toDatetime('0'))[:-9] in ['1970-01-01', '1969-12-31']
def test_utils_threaded():
def _squared(num, results, i, job_is_done_event=None):
time.sleep(0.5)
results[i] = num * num
starttime = time.time()
results = utils.threaded(_squared, [[1], [2], [3], [4], [5]])
assert results == [1, 4, 9, 16, 25]
@ -28,28 +31,28 @@ def test_utils_downloadSessionImages():
def test_utils_searchType():
st = utils.searchType('movie')
st = utils.searchType("movie")
assert st == 1
movie = utils.searchType(1)
assert movie == '1'
assert movie == "1"
with pytest.raises(NotFound):
utils.searchType('kekekekeke')
utils.searchType("kekekekeke")
def test_utils_joinArgs():
test_dict = {'genre': 'action', 'type': 1337}
assert utils.joinArgs(test_dict) == '?genre=action&type=1337'
test_dict = {"genre": "action", "type": 1337}
assert utils.joinArgs(test_dict) == "?genre=action&type=1337"
def test_utils_cast():
int_int = utils.cast(int, 1)
int_str = utils.cast(int, '1')
bool_str = utils.cast(bool, '1')
int_str = utils.cast(int, "1")
bool_str = utils.cast(bool, "1")
bool_int = utils.cast(bool, 1)
float_int = utils.cast(float, 1)
float_float = utils.cast(float, 1.0)
float_str = utils.cast(float, '1.2')
float_nan = utils.cast(float, 'wut?')
float_str = utils.cast(float, "1.2")
float_nan = utils.cast(float, "wut?")
assert int_int == 1 and isinstance(int_int, int)
assert int_str == 1 and isinstance(int_str, int)
assert bool_str is True
@ -59,7 +62,7 @@ def test_utils_cast():
assert float_str == 1.2 and isinstance(float_str, float)
assert float_nan != float_nan # nan is never equal
with pytest.raises(ValueError):
bool_str = utils.cast(bool, 'kek')
bool_str = utils.cast(bool, "kek")
def test_utils_download(plex, episode):
@ -67,5 +70,9 @@ def test_utils_download(plex, episode):
locations = episode.locations[0]
session = episode._server._session
assert utils.download(url, plex._token, filename=locations, mocked=True)
assert utils.download(url, plex._token, filename=locations, session=session, mocked=True)
assert utils.download(episode.thumbUrl, plex._token, filename=episode.title, mocked=True)
assert utils.download(
url, plex._token, filename=locations, session=session, mocked=True
)
assert utils.download(
episode.thumbUrl, plex._token, filename=episode.title, mocked=True
)

View file

@ -20,7 +20,14 @@ def test_video_Movie_attributeerror(movie):
def test_video_ne(movies):
assert len(movies.fetchItems('/library/sections/%s/all' % movies.key, title__ne='Sintel')) == 3
assert (
len(
movies.fetchItems(
"/library/sections/%s/all" % movies.key, title__ne="Sintel"
)
)
== 3
)
def test_video_Movie_delete(movie, patched_http_call):
@ -28,7 +35,7 @@ def test_video_Movie_delete(movie, patched_http_call):
def test_video_Movie_addCollection(movie):
labelname = 'Random_label'
labelname = "Random_label"
org_collection = [tag.tag for tag in movie.collections if tag]
assert labelname not in org_collection
movie.addCollection(labelname)
@ -41,12 +48,18 @@ def test_video_Movie_addCollection(movie):
def test_video_Movie_getStreamURL(movie, account):
key = movie.ratingKey
assert movie.getStreamURL() == '{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&X-Plex-Token={2}'.format(utils.SERVER_BASEURL, key, account.authenticationToken) # noqa
assert movie.getStreamURL(videoResolution='800x600') == '{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&videoResolution=800x600&X-Plex-Token={2}'.format(utils.SERVER_BASEURL, key, account.authenticationToken) # noqa
assert movie.getStreamURL() == "{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&X-Plex-Token={2}".format(
utils.SERVER_BASEURL, key, account.authenticationToken
) # noqa
assert movie.getStreamURL(
videoResolution="800x600"
) == "{0}/video/:/transcode/universal/start.m3u8?X-Plex-Platform=Chrome&copyts=1&mediaIndex=0&offset=0&path=%2Flibrary%2Fmetadata%2F{1}&videoResolution=800x600&X-Plex-Token={2}".format(
utils.SERVER_BASEURL, key, account.authenticationToken
) # noqa
def test_video_Movie_isFullObject_and_reload(plex):
movie = plex.library.section('Movies').get('Sita Sings the Blues')
movie = plex.library.section("Movies").get("Sita Sings the Blues")
assert movie.isFullObject() is False
movie.reload()
assert movie.isFullObject() is True
@ -54,7 +67,7 @@ def test_video_Movie_isFullObject_and_reload(plex):
assert movie_via_search.isFullObject() is False
movie_via_search.reload()
assert movie_via_search.isFullObject() is True
movie_via_section_search = plex.library.section('Movies').search(movie.title)[0]
movie_via_section_search = plex.library.section("Movies").search(movie.title)[0]
assert movie_via_section_search.isFullObject() is False
movie_via_section_search.reload()
assert movie_via_section_search.isFullObject() is True
@ -82,7 +95,7 @@ def test_video_Movie_iterParts(movie):
def test_video_Movie_download(monkeydownload, tmpdir, movie):
filepaths1 = movie.download(savepath=str(tmpdir))
assert len(filepaths1) >= 1
filepaths2 = movie.download(savepath=str(tmpdir), videoResolution='500x300')
filepaths2 = movie.download(savepath=str(tmpdir), videoResolution="500x300")
assert len(filepaths2) >= 1
@ -101,7 +114,7 @@ def test_video_Movie_upload_select_remove_subtitle(movie, subtitle):
movie.uploadSubtitles(filepath)
movie.reload()
subtitles = [sub.title for sub in movie.subtitleStreams()]
subname = subtitle.name.rsplit('.', 1)[0]
subname = subtitle.name.rsplit(".", 1)[0]
assert subname in subtitles
subtitleSelection = movie.subtitleStreams()[0]
@ -124,31 +137,37 @@ def test_video_Movie_upload_select_remove_subtitle(movie, subtitle):
def test_video_Movie_attrs(movies):
movie = movies.get('Sita Sings the Blues')
movie = movies.get("Sita Sings the Blues")
assert len(movie.locations[0]) >= 10
assert utils.is_datetime(movie.addedAt)
assert utils.is_metadata(movie.art)
assert movie.artUrl
assert movie.audienceRating == 8.5
# Disabled this since it failed on the last run, wasnt in the original xml result.
#assert movie.audienceRatingImage == 'rottentomatoes://image.rating.upright'
# assert movie.audienceRatingImage == 'rottentomatoes://image.rating.upright'
movie.reload() # RELOAD
assert movie.chapterSource is None
assert movie.collections == []
assert movie.contentRating in utils.CONTENTRATINGS
assert all([i.tag in ['US', 'USA'] for i in movie.countries])
assert [i.tag for i in movie.directors] == ['Nina Paley']
assert all([i.tag in ["US", "USA"] for i in movie.countries])
assert [i.tag for i in movie.directors] == ["Nina Paley"]
assert movie.duration >= 160000
assert movie.fields == []
assert movie.posters()
assert sorted([i.tag for i in movie.genres]) == ['Animation', 'Comedy', 'Fantasy', 'Musical', 'Romance']
assert movie.guid == 'com.plexapp.agents.imdb://tt1172203?lang=en'
assert sorted([i.tag for i in movie.genres]) == [
"Animation",
"Comedy",
"Fantasy",
"Musical",
"Romance",
]
assert movie.guid == "com.plexapp.agents.imdb://tt1172203?lang=en"
assert utils.is_metadata(movie._initpath)
assert utils.is_metadata(movie.key)
if movie.lastViewedAt:
assert utils.is_datetime(movie.lastViewedAt)
assert int(movie.librarySectionID) >= 1
assert movie.listType == 'video'
assert movie.listType == "video"
assert movie.originalTitle is None
assert utils.is_datetime(movie.originallyAvailableAt)
assert movie.playlistItemID is None
@ -156,25 +175,30 @@ def test_video_Movie_attrs(movies):
assert utils.is_metadata(movie.primaryExtraKey)
assert [i.tag for i in movie.producers] == []
assert float(movie.rating) >= 6.4
#assert movie.ratingImage == 'rottentomatoes://image.rating.ripe'
# assert movie.ratingImage == 'rottentomatoes://image.rating.ripe'
assert movie.ratingKey >= 1
assert set(sorted([i.tag for i in movie.roles])) >= {'Aladdin Ullah', 'Annette Hanshaw', 'Aseem Chhabra', 'Debargo Sanyal'} # noqa
assert set(sorted([i.tag for i in movie.roles])) >= {
"Aladdin Ullah",
"Annette Hanshaw",
"Aseem Chhabra",
"Debargo Sanyal",
} # noqa
assert movie._server._baseurl == utils.SERVER_BASEURL
assert movie.sessionKey is None
assert movie.studio == 'Nina Paley'
assert movie.studio == "Nina Paley"
assert utils.is_string(movie.summary, gte=100)
assert movie.tagline == 'The Greatest Break-Up Story Ever Told'
assert movie.tagline == "The Greatest Break-Up Story Ever Told"
assert utils.is_thumb(movie.thumb)
assert movie.title == 'Sita Sings the Blues'
assert movie.titleSort == 'Sita Sings the Blues'
assert movie.title == "Sita Sings the Blues"
assert movie.titleSort == "Sita Sings the Blues"
assert not movie.transcodeSessions
assert movie.type == 'movie'
assert movie.type == "movie"
assert movie.updatedAt > datetime(2017, 1, 1)
assert movie.userRating is None
assert movie.viewCount == 0
assert utils.is_int(movie.viewOffset, gte=0)
assert movie.viewedAt is None
assert sorted([i.tag for i in movie.writers][:4]) == ['Nina Paley'] # noqa
assert sorted([i.tag for i in movie.writers][:4]) == ["Nina Paley"] # noqa
assert movie.year == 2008
# Audio
audio = movie.media[0].parts[0].audioStreams()[0]
@ -218,10 +242,13 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(media.width, gte=200)
# Video
video = movie.media[0].parts[0].videoStreams()[0]
assert video.bitDepth in (8, None) # Different versions of Plex Server return different values
assert video.bitDepth in (
8,
None,
) # Different versions of Plex Server return different values
assert utils.is_int(video.bitrate)
assert video.cabac is None
assert video.chromaSubsampling in ('4:2:0', None)
assert video.chromaSubsampling in ("4:2:0", None)
assert video.codec in utils.CODECS
assert video.codecID is None
assert video.colorSpace is None
@ -238,7 +265,7 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(video.level)
assert video.profile in utils.PROFILES
assert utils.is_int(video.refFrames)
assert video.scanType in ('progressive', None)
assert video.scanType in ("progressive", None)
assert video.selected is False
assert video._server._baseurl == utils.SERVER_BASEURL
assert utils.is_int(video.streamType)
@ -262,7 +289,7 @@ def test_video_Movie_attrs(movies):
assert stream1.bitDepth in (8, None)
assert utils.is_int(stream1.bitrate)
assert stream1.cabac is None
assert stream1.chromaSubsampling in ('4:2:0', None)
assert stream1.chromaSubsampling in ("4:2:0", None)
assert stream1.codec in utils.CODECS
assert stream1.codecID is None
assert stream1.colorSpace is None
@ -279,7 +306,7 @@ def test_video_Movie_attrs(movies):
assert utils.is_int(stream1.level)
assert stream1.profile in utils.PROFILES
assert utils.is_int(stream1.refFrames)
assert stream1.scanType in ('progressive', None)
assert stream1.scanType in ("progressive", None)
assert stream1.selected is False
assert stream1._server._baseurl == utils.SERVER_BASEURL
assert utils.is_int(stream1.streamType)
@ -319,7 +346,7 @@ def test_video_Movie_history(movie):
def test_video_Show(show):
assert show.title == 'Game of Thrones'
assert show.title == "Game of Thrones"
def test_video_Episode_split(episode, patched_http_call):
@ -335,26 +362,30 @@ def test_video_Episode_updateProgress(episode, patched_http_call):
def test_video_Episode_updateTimeline(episode, patched_http_call):
episode.updateTimeline(10 * 60 * 1000, state='playing', duration=episode.duration) # 10 minutes.
episode.updateTimeline(
10 * 60 * 1000, state="playing", duration=episode.duration
) # 10 minutes.
def test_video_Episode_stop(episode, mocker, patched_http_call):
mocker.patch.object(episode, 'session', return_value=list(mocker.MagicMock(id='hello')))
mocker.patch.object(
episode, "session", return_value=list(mocker.MagicMock(id="hello"))
)
episode.stop(reason="It's past bedtime!")
def test_video_Show_attrs(show):
assert utils.is_datetime(show.addedAt)
assert utils.is_metadata(show.art, contains='/art/')
assert utils.is_metadata(show.banner, contains='/banner/')
assert utils.is_metadata(show.art, contains="/art/")
assert utils.is_metadata(show.banner, contains="/banner/")
assert utils.is_int(show.childCount)
assert show.contentRating in utils.CONTENTRATINGS
assert utils.is_int(show.duration, gte=1600000)
assert utils.is_section(show._initpath)
# Check reloading the show loads the full list of genres
assert not {'Adventure', 'Drama'} - {i.tag for i in show.genres}
assert not {"Adventure", "Drama"} - {i.tag for i in show.genres}
show.reload()
assert sorted([i.tag for i in show.genres]) == ['Adventure', 'Drama', 'Fantasy']
assert sorted([i.tag for i in show.genres]) == ["Adventure", "Drama", "Fantasy"]
# So the initkey should have changed because of the reload
assert utils.is_metadata(show._initpath)
assert utils.is_int(show.index)
@ -362,21 +393,31 @@ def test_video_Show_attrs(show):
if show.lastViewedAt:
assert utils.is_datetime(show.lastViewedAt)
assert utils.is_int(show.leafCount)
assert show.listType == 'video'
assert show.listType == "video"
assert len(show.locations[0]) >= 10
assert utils.is_datetime(show.originallyAvailableAt)
assert show.rating >= 8.0
assert utils.is_int(show.ratingKey)
assert sorted([i.tag for i in show.roles])[:4] == ['Aidan Gillen', 'Aimee Richardson', 'Alexander Siddig', 'Alfie Allen'] # noqa
assert sorted([i.tag for i in show.actors])[:4] == ['Aidan Gillen', 'Aimee Richardson', 'Alexander Siddig', 'Alfie Allen'] # noqa
assert sorted([i.tag for i in show.roles])[:4] == [
"Aidan Gillen",
"Aimee Richardson",
"Alexander Siddig",
"Alfie Allen",
] # noqa
assert sorted([i.tag for i in show.actors])[:4] == [
"Aidan Gillen",
"Aimee Richardson",
"Alexander Siddig",
"Alfie Allen",
] # noqa
assert show._server._baseurl == utils.SERVER_BASEURL
assert show.studio == 'HBO'
assert show.studio == "HBO"
assert utils.is_string(show.summary, gte=100)
assert utils.is_metadata(show.theme, contains='/theme/')
assert utils.is_metadata(show.thumb, contains='/thumb/')
assert show.title == 'Game of Thrones'
assert show.titleSort == 'Game of Thrones'
assert show.type == 'show'
assert utils.is_metadata(show.theme, contains="/theme/")
assert utils.is_metadata(show.thumb, contains="/thumb/")
assert show.title == "Game of Thrones"
assert show.titleSort == "Game of Thrones"
assert show.type == "show"
assert utils.is_datetime(show.updatedAt)
assert utils.is_int(show.viewCount, gte=0)
assert utils.is_int(show.viewedLeafCount, gte=0)
@ -392,14 +433,14 @@ def test_video_Show_history(show):
def test_video_Show_watched(tvshows):
show = tvshows.get('The 100')
show = tvshows.get("The 100")
show.episodes()[0].markWatched()
watched = show.watched()
assert len(watched) == 1 and watched[0].title == 'Pilot'
assert len(watched) == 1 and watched[0].title == "Pilot"
def test_video_Show_unwatched(tvshows):
show = tvshows.get('The 100')
show = tvshows.get("The 100")
episodes = show.episodes()
episodes[0].markWatched()
unwatched = show.unwatched()
@ -409,21 +450,21 @@ def test_video_Show_unwatched(tvshows):
def test_video_Show_location(plex):
# This should be a part of test test_video_Show_attrs but is excluded
# because of https://github.com/mjs7231/python-plexapi/issues/97
show = plex.library.section('TV Shows').get('The 100')
show = plex.library.section("TV Shows").get("The 100")
assert len(show.locations) >= 1
def test_video_Show_reload(plex):
show = plex.library.section('TV Shows').get('Game of Thrones')
assert utils.is_metadata(show._initpath, prefix='/library/sections/')
show = plex.library.section("TV Shows").get("Game of Thrones")
assert utils.is_metadata(show._initpath, prefix="/library/sections/")
assert len(show.roles) == 3
show.reload()
assert utils.is_metadata(show._initpath, prefix='/library/metadata/')
assert utils.is_metadata(show._initpath, prefix="/library/metadata/")
assert len(show.roles) > 3
def test_video_Show_episodes(tvshows):
show = tvshows.get('The 100')
show = tvshows.get("The 100")
episodes = show.episodes()
episodes[0].markWatched()
unwatched = show.episodes(viewCount=0)
@ -437,7 +478,7 @@ def test_video_Show_download(monkeydownload, tmpdir, show):
def test_video_Season_download(monkeydownload, tmpdir, show):
season = show.season('Season 1')
season = show.season("Season 1")
filepaths = season.download(savepath=str(tmpdir))
assert len(filepaths) >= 4
@ -445,14 +486,16 @@ def test_video_Season_download(monkeydownload, tmpdir, show):
def test_video_Episode_download(monkeydownload, tmpdir, episode):
f = episode.download(savepath=str(tmpdir))
assert len(f) == 1
with_sceen_size = episode.download(savepath=str(tmpdir), **{'videoResolution': '500x300'})
with_sceen_size = episode.download(
savepath=str(tmpdir), **{"videoResolution": "500x300"}
)
assert len(with_sceen_size) == 1
def test_video_Show_thumbUrl(show):
assert utils.SERVER_BASEURL in show.thumbUrl
assert '/library/metadata/' in show.thumbUrl
assert '/thumb/' in show.thumbUrl
assert "/library/metadata/" in show.thumbUrl
assert "/thumb/" in show.thumbUrl
# Analyze seems to fail intermittently
@ -476,7 +519,7 @@ def test_video_Show_refresh(show):
def test_video_Show_get(show):
assert show.get('Winter Is Coming').title == 'Winter Is Coming'
assert show.get("Winter Is Coming").title == "Winter Is Coming"
def test_video_Show_isWatched(show):
@ -485,11 +528,11 @@ def test_video_Show_isWatched(show):
def test_video_Show_section(show):
section = show.section()
assert section.title == 'TV Shows'
assert section.title == "TV Shows"
def test_video_Episode(show):
episode = show.episode('Winter Is Coming')
episode = show.episode("Winter Is Coming")
assert episode == show.episode(season=1, episode=1)
with pytest.raises(BadRequest):
show.episode()
@ -507,7 +550,7 @@ def test_video_Episode_history(episode):
# Analyze seems to fail intermittently
@pytest.mark.xfail
def test_video_Episode_analyze(tvshows):
episode = tvshows.get('Game of Thrones').episode(season=1, episode=1)
episode = tvshows.get("Game of Thrones").episode(season=1, episode=1)
episode.analyze()
@ -515,31 +558,33 @@ def test_video_Episode_attrs(episode):
assert utils.is_datetime(episode.addedAt)
assert episode.contentRating in utils.CONTENTRATINGS
if len(episode.directors):
assert [i.tag for i in episode.directors] == ['Tim Van Patten']
assert [i.tag for i in episode.directors] == ["Tim Van Patten"]
assert utils.is_int(episode.duration, gte=120000)
assert episode.grandparentTitle == 'Game of Thrones'
assert episode.grandparentTitle == "Game of Thrones"
assert episode.index == 1
assert utils.is_metadata(episode._initpath)
assert utils.is_metadata(episode.key)
assert episode.listType == 'video'
assert episode.listType == "video"
assert utils.is_datetime(episode.originallyAvailableAt)
assert utils.is_int(episode.parentIndex)
assert utils.is_metadata(episode.parentKey)
assert utils.is_int(episode.parentRatingKey)
assert utils.is_metadata(episode.parentThumb, contains='/thumb/')
assert utils.is_metadata(episode.parentThumb, contains="/thumb/")
assert episode.rating >= 7.7
assert utils.is_int(episode.ratingKey)
assert episode._server._baseurl == utils.SERVER_BASEURL
assert utils.is_string(episode.summary, gte=100)
assert utils.is_metadata(episode.thumb, contains='/thumb/')
assert episode.title == 'Winter Is Coming'
assert episode.titleSort == 'Winter Is Coming'
assert utils.is_metadata(episode.thumb, contains="/thumb/")
assert episode.title == "Winter Is Coming"
assert episode.titleSort == "Winter Is Coming"
assert not episode.transcodeSessions
assert episode.type == 'episode'
assert episode.type == "episode"
assert utils.is_datetime(episode.updatedAt)
assert utils.is_int(episode.viewCount, gte=0)
assert episode.viewOffset == 0
assert sorted([i.tag for i in episode.writers]) == sorted(['David Benioff', 'D. B. Weiss'])
assert sorted([i.tag for i in episode.writers]) == sorted(
["David Benioff", "D. B. Weiss"]
)
assert episode.year == 2011
assert episode.isWatched in [True, False]
# Media
@ -577,12 +622,12 @@ def test_video_Episode_attrs(episode):
def test_video_Season(show):
seasons = show.seasons()
assert len(seasons) == 2
assert ['Season 1', 'Season 2'] == [s.title for s in seasons[:2]]
assert show.season('Season 1') == seasons[0]
assert ["Season 1", "Season 2"] == [s.title for s in seasons[:2]]
assert show.season("Season 1") == seasons[0]
def test_video_Season_history(show):
season = show.season('Season 1')
season = show.season("Season 1")
season.markWatched()
history = season.history()
assert len(history)
@ -590,7 +635,7 @@ def test_video_Season_history(show):
def test_video_Season_attrs(show):
season = show.season('Season 1')
season = show.season("Season 1")
assert utils.is_datetime(season.addedAt)
assert season.index == 1
assert utils.is_metadata(season._initpath)
@ -598,17 +643,17 @@ def test_video_Season_attrs(show):
if season.lastViewedAt:
assert utils.is_datetime(season.lastViewedAt)
assert utils.is_int(season.leafCount, gte=3)
assert season.listType == 'video'
assert season.listType == "video"
assert utils.is_metadata(season.parentKey)
assert utils.is_int(season.parentRatingKey)
assert season.parentTitle == 'Game of Thrones'
assert season.parentTitle == "Game of Thrones"
assert utils.is_int(season.ratingKey)
assert season._server._baseurl == utils.SERVER_BASEURL
assert season.summary == ''
assert utils.is_metadata(season.thumb, contains='/thumb/')
assert season.title == 'Season 1'
assert season.titleSort == 'Season 1'
assert season.type == 'season'
assert season.summary == ""
assert utils.is_metadata(season.thumb, contains="/thumb/")
assert season.title == "Season 1"
assert season.titleSort == "Season 1"
assert season.type == "season"
assert utils.is_datetime(season.updatedAt)
assert utils.is_int(season.viewCount, gte=0)
assert utils.is_int(season.viewedLeafCount, gte=0)
@ -617,34 +662,34 @@ def test_video_Season_attrs(show):
def test_video_Season_show(show):
season = show.seasons()[0]
season_by_name = show.season('Season 1')
season_by_name = show.season("Season 1")
assert show.ratingKey == season.parentRatingKey and season_by_name.parentRatingKey
assert season.ratingKey == season_by_name.ratingKey
def test_video_Season_watched(tvshows):
show = tvshows.get('Game of Thrones')
show = tvshows.get("Game of Thrones")
season = show.season(1)
sne = show.season('Season 1')
sne = show.season("Season 1")
assert season == sne
season.markWatched()
assert season.isWatched
def test_video_Season_unwatched(tvshows):
season = tvshows.get('Game of Thrones').season(1)
season = tvshows.get("Game of Thrones").season(1)
season.markUnwatched()
assert not season.isWatched
def test_video_Season_get(show):
episode = show.season(1).get('Winter Is Coming')
assert episode.title == 'Winter Is Coming'
episode = show.season(1).get("Winter Is Coming")
assert episode.title == "Winter Is Coming"
def test_video_Season_episode(show):
episode = show.season(1).get('Winter Is Coming')
assert episode.title == 'Winter Is Coming'
episode = show.season(1).get("Winter Is Coming")
assert episode.title == "Winter Is Coming"
def test_video_Season_episode_by_index(show):
@ -659,34 +704,62 @@ def test_video_Season_episodes(show):
def test_that_reload_return_the_same_object(plex):
# we want to check this that all the urls are correct
movie_library_search = plex.library.section('Movies').search('Elephants Dream')[0]
movie_search = plex.search('Elephants Dream')[0]
movie_section_get = plex.library.section('Movies').get('Elephants Dream')
movie_library_search = plex.library.section("Movies").search("Elephants Dream")[0]
movie_search = plex.search("Elephants Dream")[0]
movie_section_get = plex.library.section("Movies").get("Elephants Dream")
movie_library_search_key = movie_library_search.key
movie_search_key = movie_search.key
movie_section_get_key = movie_section_get.key
assert movie_library_search_key == movie_library_search.reload().key == movie_search_key == movie_search.reload().key == movie_section_get_key == movie_section_get.reload().key # noqa
tvshow_library_search = plex.library.section('TV Shows').search('The 100')[0]
tvshow_search = plex.search('The 100')[0]
tvshow_section_get = plex.library.section('TV Shows').get('The 100')
assert (
movie_library_search_key
== movie_library_search.reload().key
== movie_search_key
== movie_search.reload().key
== movie_section_get_key
== movie_section_get.reload().key
) # noqa
tvshow_library_search = plex.library.section("TV Shows").search("The 100")[0]
tvshow_search = plex.search("The 100")[0]
tvshow_section_get = plex.library.section("TV Shows").get("The 100")
tvshow_library_search_key = tvshow_library_search.key
tvshow_search_key = tvshow_search.key
tvshow_section_get_key = tvshow_section_get.key
assert tvshow_library_search_key == tvshow_library_search.reload().key == tvshow_search_key == tvshow_search.reload().key == tvshow_section_get_key == tvshow_section_get.reload().key # noqa
assert (
tvshow_library_search_key
== tvshow_library_search.reload().key
== tvshow_search_key
== tvshow_search.reload().key
== tvshow_section_get_key
== tvshow_section_get.reload().key
) # noqa
season_library_search = tvshow_library_search.season(1)
season_search = tvshow_search.season(1)
season_section_get = tvshow_section_get.season(1)
season_library_search_key = season_library_search.key
season_search_key = season_search.key
season_section_get_key = season_section_get.key
assert season_library_search_key == season_library_search.reload().key == season_search_key == season_search.reload().key == season_section_get_key == season_section_get.reload().key # noqa
assert (
season_library_search_key
== season_library_search.reload().key
== season_search_key
== season_search.reload().key
== season_section_get_key
== season_section_get.reload().key
) # noqa
episode_library_search = tvshow_library_search.episode(season=1, episode=1)
episode_search = tvshow_search.episode(season=1, episode=1)
episode_section_get = tvshow_section_get.episode(season=1, episode=1)
episode_library_search_key = episode_library_search.key
episode_search_key = episode_search.key
episode_section_get_key = episode_section_get.key
assert episode_library_search_key == episode_library_search.reload().key == episode_search_key == episode_search.reload().key == episode_section_get_key == episode_section_get.reload().key # noqa
assert (
episode_library_search_key
== episode_library_search.reload().key
== episode_search_key
== episode_search.reload().key
== episode_section_get_key
== episode_section_get.reload().key
) # noqa
def test_video_exists_accessible(movie, episode):
@ -703,7 +776,9 @@ def test_video_exists_accessible(movie, episode):
assert episode.media[0].parts[0].accessible is True
@pytest.mark.skip(reason='broken? assert len(plex.conversions()) == 1 may fail on some builds')
@pytest.mark.skip(
reason="broken? assert len(plex.conversions()) == 1 may fail on some builds"
)
def test_video_optimize(movie, plex):
plex.optimizedItems(removeAll=True)
movie.optimize(targetTagID=1)

View file

@ -1,55 +1,232 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
The script is used to bootstrap a docker container with Plex and with
all the libraries required for testing.
The script is used to bootstrap a the test enviroment for plexapi
with all the libraries required for testing.
By default this uses a docker.
It can be used manually using:
python plex-bootraptest.py --no-docker --server-name name_of_server --account Hellowlol --password yourpassword
"""
import argparse
import os
import plexapi
import shutil
import socket
import time
import zipfile
from glob import glob
from shutil import copyfile, rmtree
from shutil import copyfile
from subprocess import call
from tqdm import tqdm
from uuid import uuid4
from plexapi.compat import which, makedirs
import plexapi
from plexapi.compat import makedirs, which
from plexapi.exceptions import BadRequest, NotFound
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from plexapi.utils import download, SEARCHTYPES
from plexapi.utils import SEARCHTYPES
from tqdm import tqdm
DOCKER_CMD = [
'docker', 'run', '-d',
'--name', 'plex-test-%(container_name_extra)s%(image_tag)s',
'--restart', 'on-failure',
'-p', '32400:32400/tcp',
'-p', '3005:3005/tcp',
'-p', '8324:8324/tcp',
'-p', '32469:32469/tcp',
'-p', '1900:1900/udp',
'-p', '32410:32410/udp',
'-p', '32412:32412/udp',
'-p', '32413:32413/udp',
'-p', '32414:32414/udp',
'-e', 'TZ="Europe/London"',
'-e', 'PLEX_CLAIM=%(claim_token)s',
'-e', 'ADVERTISE_IP=http://%(advertise_ip)s:32400/',
'-h', '%(hostname)s',
'-e', 'TZ="%(timezone)s"',
'-v', '%(destination)s/db:/config',
'-v', '%(destination)s/transcode:/transcode',
'-v', '%(destination)s/media:/data',
'plexinc/pms-docker:%(image_tag)s'
"docker",
"run",
"-d",
"--name",
"plex-test-%(container_name_extra)s%(image_tag)s",
"--restart",
"on-failure",
"-p",
"32400:32400/tcp",
"-p",
"3005:3005/tcp",
"-p",
"8324:8324/tcp",
"-p",
"32469:32469/tcp",
"-p",
"1900:1900/udp",
"-p",
"32410:32410/udp",
"-p",
"32412:32412/udp",
"-p",
"32413:32413/udp",
"-p",
"32414:32414/udp",
"-e",
'TZ="Europe/London"',
"-e",
"PLEX_CLAIM=%(claim_token)s",
"-e",
"ADVERTISE_IP=http://%(advertise_ip)s:32400/",
"-h",
"%(hostname)s",
"-e",
'TZ="%(timezone)s"',
"-v",
"%(destination)s/db:/config",
"-v",
"%(destination)s/transcode:/transcode",
"-v",
"%(destination)s/media:/data",
"plexinc/pms-docker:%(image_tag)s",
]
BASE_DIR_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
STUB_MOVIE_PATH = os.path.join(BASE_DIR_PATH, "tests", "data", "video_stub.mp4")
STUB_MP3_PATH = os.path.join(BASE_DIR_PATH, "tests", "data", "audio_stub.mp3")
STUB_IMAGE_PATH = os.path.join(BASE_DIR_PATH, "tests", "data", "cute_cat.jpg")
def check_ext(path, ext):
"""I hate glob so much."""
result = []
for root, dirs, fil in os.walk(path):
for f in fil:
fp = os.path.join(root, f)
if fp.endswith(ext):
result.append(fp)
return result
class ExistingSection(Exception):
"""This server has sections, exiting"""
def __init__(self, *args):
raise SystemExit("This server has sections exiting")
def clean_pms(server, path):
for section in server.library.sections():
print("Deleting %s" % section.title)
section.delete()
server.library.cleanBundles()
server.library.optimize()
print("optimized db and removed any bundles")
shutil.rmtree(path, ignore_errors=False, onerror=None)
print("Deleted %s" % path)
def setup_music(music_path):
print("Setup files for music section..")
makedirs(music_path, exist_ok=True)
all_music = {
"Broke for free": {
"Layers": [
"1 - As Colorful As Ever.mp3",
#"02 - Knock Knock.mp3",
#"03 - Only Knows.mp3",
#"04 - If.mp3",
#"05 - Note Drop.mp3",
#"06 - Murmur.mp3",
#"07 - Spellbound.mp3",
#"08 - The Collector.mp3",
#"09 - Quit Bitching.mp3",
#"10 - A Year.mp3",
]
},
}
for artist, album in all_music.items():
for k, v in album.items():
artist_album = os.path.join(music_path, artist, k)
makedirs(artist_album, exist_ok=True)
for song in v:
copyfile(STUB_MP3_PATH, os.path.join(artist_album, song))
return len(check_ext(music_path, (".mp3")))
def setup_movies(movies_path):
print("Setup files for the Movies section..")
makedirs(movies_path, exist_ok=True)
if len(glob(movies_path + "/*.mkv", recursive=True)) == 4:
return 4
required_movies = {
"Elephants Dream": 2006,
"Sita Sings the Blues": 2008,
"Big Buck Bunny": 2008,
"Sintel": 2010,
}
expected_media_count = 0
for name, year in required_movies.items():
expected_media_count += 1
if not os.path.isfile(get_movie_path(movies_path, name, year)):
copyfile(STUB_MOVIE_PATH, get_movie_path(movies_path, name, year))
return expected_media_count
def setup_images(photos_path):
print("Setup files for the Photos section..")
makedirs(photos_path, exist_ok=True)
# expected_photo_count = 0
folders = {
("Cats",): 3,
("Cats", "Cats in bed"): 7,
("Cats", "Cats not in bed"): 1,
("Cats", "Not cats in bed"): 1,
}
has_photos = 0
for folder_path, required_cnt in folders.items():
folder_path = os.path.join(photos_path, *folder_path)
makedirs(folder_path, exist_ok=True)
photos_in_folder = len(glob(os.path.join(folder_path, "/*.jpg")))
while photos_in_folder < required_cnt:
# Dunno why this is need got permission error on photo0.jpg
photos_in_folder += 1
full_path = os.path.join(folder_path, "photo%d.jpg" % photos_in_folder)
copyfile(STUB_IMAGE_PATH, full_path)
has_photos += photos_in_folder
return len(check_ext(photos_path, (".jpg")))
def setup_show(tvshows_path):
print("Setup files for the TV-Shows section..")
makedirs(tvshows_path, exist_ok=True)
makedirs(os.path.join(tvshows_path, "Game of Thrones"), exist_ok=True)
makedirs(os.path.join(tvshows_path, "The 100"), exist_ok=True)
required_tv_shows = {
"Game of Thrones": [list(range(1, 11)), list(range(1, 11))],
"The 100": [list(range(1, 14)), list(range(1, 17))],
}
expected_media_count = 0
for show_name, seasons in required_tv_shows.items():
for season_id, episodes in enumerate(seasons, start=1):
for episode_id in episodes:
expected_media_count += 1
episode_path = get_tvshow_path(
tvshows_path, show_name, season_id, episode_id
)
if not os.path.isfile(episode_path):
copyfile(STUB_MOVIE_PATH, episode_path)
return expected_media_count
def get_default_ip():
""" Return the first IP address of the current machine if available. """
available_ips = list(set([i[4][0] for i in socket.getaddrinfo(socket.gethostname(), None)
if i[4][0] not in ('127.0.0.1', '::1') and not i[4][0].startswith('fe80:')]))
available_ips = list(
set(
[
i[4][0]
for i in socket.getaddrinfo(socket.gethostname(), None)
if i[4][0] not in ("127.0.0.1", "::1")
and not i[4][0].startswith("fe80:")
]
)
)
return available_ips[0] if len(available_ips) else None
@ -62,14 +239,14 @@ def get_plex_account(opts):
return None
def get_movie_path(name, year):
def get_movie_path(movies_path, name, year):
""" Return a movie path given its title and year. """
return os.path.join(movies_path, '%s (%d).mp4' % (name, year))
return os.path.join(movies_path, "%s (%d).mp4" % (name, year))
def get_tvshow_path(name, season, episode):
def get_tvshow_path(tvshows_path, name, season, episode):
""" Return a TV show path given its title, season, and episode. """
return os.path.join(tvshows_path, name, 'S%02dE%02d.mp4' % (season, episode))
return os.path.join(tvshows_path, name, "S%02dE%02d.mp4" % (season, episode))
def add_library_section(server, section):
@ -83,52 +260,63 @@ def add_library_section(server, section):
server.library.add(**section)
return True
except BadRequest as err:
if 'server is still starting up. Please retry later' in str(err):
if "server is still starting up. Please retry later" in str(err):
time.sleep(1)
continue
raise
runtime = time.time() - start
raise SystemExit('Timeout adding section to Plex instance.')
raise SystemExit("Timeout adding section to Plex instance.")
def create_section(server, section, opts):
processed_media = 0
expected_media_count = section.pop('expected_media_count', 0)
expected_media_type = (section['type'], )
if section['type'] == 'artist':
expected_media_type = ('artist', 'album', 'track')
expected_media_count = section.pop("expected_media_count", 0)
expected_media_type = (section["type"],)
if section["type"] == "artist":
expected_media_type = ("artist", "album", "track")
expected_media_type = tuple(SEARCHTYPES[t] for t in expected_media_type)
def alert_callback(data):
""" Listen to the Plex notifier to determine when metadata scanning is complete. """
global processed_media
if data['type'] == 'timeline':
for entry in data['TimelineEntry']:
if entry.get('identifier', 'com.plexapp.plugins.library') == 'com.plexapp.plugins.library':
if data["type"] == "timeline":
for entry in data["TimelineEntry"]:
if (
entry.get("identifier", "com.plexapp.plugins.library")
== "com.plexapp.plugins.library"
):
# Missed mediaState means that media was processed (analyzed & thumbnailed)
if 'mediaState' not in entry and entry['type'] in expected_media_type:
if (
"mediaState" not in entry
and entry["type"] in expected_media_type
):
# state=5 means record processed, applicable only when metadata source was set
if entry['state'] == 5:
if entry["state"] == 5:
cnt = 1
if entry['type'] == SEARCHTYPES['show']:
show = server.library.sectionByID(str(entry['sectionID'])).get(entry['title'])
if entry["type"] == SEARCHTYPES["show"]:
show = server.library.sectionByID(
str(entry["sectionID"])
).get(entry["title"])
cnt = show.leafCount
bar.update(cnt)
processed_media += cnt
# state=1 means record processed, when no metadata source was set
elif entry['state'] == 1 and entry['type'] == SEARCHTYPES['photo']:
elif (
entry["state"] == 1
and entry["type"] == SEARCHTYPES["photo"]
):
bar.update()
processed_media += 1
runtime = 0
start = time.time()
bar = tqdm(desc='Scanning section ' + section['name'], total=expected_media_count)
bar = tqdm(desc="Scanning section " + section["name"], total=expected_media_count)
notifier = server.startAlertListener(alert_callback)
time.sleep(3)
add_library_section(server, section)
while bar.n < bar.total:
if runtime >= 120:
print('Metadata scan taking too long, but will continue anyway..')
print("Metadata scan taking too long, but will continue anyway..")
break
time.sleep(3)
runtime = int(time.time() - start)
@ -136,60 +324,134 @@ def create_section(server, section, opts):
notifier.stop()
if __name__ == '__main__':
if __name__ == "__main__":
default_ip = get_default_ip()
parser = argparse.ArgumentParser(description=__doc__)
# Authentication arguments
mg = parser.add_mutually_exclusive_group()
g = mg.add_argument_group()
g.add_argument('--username', help='Your Plex username')
g.add_argument('--password', help='Your Plex password')
mg.add_argument('--token', help='Plex.tv authentication token', default=plexapi.CONFIG.get('auth.server_token'))
mg.add_argument('--unclaimed', help='Do not claim the server', default=False, action='store_true')
g.add_argument("--username", help="Your Plex username")
g.add_argument("--password", help="Your Plex password")
mg.add_argument(
"--token",
help="Plex.tv authentication token",
default=plexapi.CONFIG.get("auth.server_token"),
)
mg.add_argument(
"--unclaimed",
help="Do not claim the server",
default=False,
action="store_true",
)
# Test environment arguments
parser.add_argument('--timezone', help='Timezone to set inside plex', default='UTC') # noqa
parser.add_argument('--destination', help='Local path where to store all the media', default=os.path.join(os.getcwd(), 'plex')) # noqa
parser.add_argument('--advertise-ip', help='IP address which should be advertised by new Plex instance', required=default_ip is None, default=default_ip) # noqa
parser.add_argument('--docker-tag', help='Docker image tag to install', default='latest') # noqa
parser.add_argument('--bootstrap-timeout', help='Timeout for each step of bootstrap, in seconds (default: %(default)s)', default=180, type=int) # noqa
parser.add_argument('--server-name', help='Name for the new server', default='plex-test-docker-%s' % str(uuid4())) # noqa
parser.add_argument('--accept-eula', help='Accept Plex`s EULA', default=False, action='store_true') # noqa
parser.add_argument('--without-movies', help='Do not create Movies section', default=True, dest='with_movies', action='store_false') # noqa
parser.add_argument('--without-shows', help='Do not create TV Shows section', default=True, dest='with_shows', action='store_false') # noqa
parser.add_argument('--without-music', help='Do not create Music section', default=True, dest='with_music', action='store_false') # noqa
parser.add_argument('--without-photos', help='Do not create Photos section', default=True, dest='with_photos', action='store_false') # noqa
parser.add_argument('--show-token', help='Display access token after bootstrap', default=False, action='store_true') # noqa
parser.add_argument(
"--no-docker", help="Use docker", default=False, action="store_true"
)
parser.add_argument(
"--timezone", help="Timezone to set inside plex", default="UTC"
) # noqa
parser.add_argument(
"--destination",
help="Local path where to store all the media",
default=os.path.join(os.getcwd(), "plex"),
) # noqa
parser.add_argument(
"--advertise-ip",
help="IP address which should be advertised by new Plex instance",
required=default_ip is None,
default=default_ip,
) # noqa
parser.add_argument(
"--docker-tag", help="Docker image tag to install", default="latest"
) # noqa
parser.add_argument(
"--bootstrap-timeout",
help="Timeout for each step of bootstrap, in seconds (default: %(default)s)",
default=180,
type=int,
) # noqa
parser.add_argument(
"--server-name",
help="Name for the new server",
default="plex-test-docker-%s" % str(uuid4()),
) # noqa
parser.add_argument(
"--accept-eula", help="Accept Plex`s EULA", default=False, action="store_true"
) # noqa
parser.add_argument(
"--without-movies",
help="Do not create Movies section",
default=True,
dest="with_movies",
action="store_false",
) # noqa
parser.add_argument(
"--without-shows",
help="Do not create TV Shows section",
default=True,
dest="with_shows",
action="store_false",
) # noqa
parser.add_argument(
"--without-music",
help="Do not create Music section",
default=True,
dest="with_music",
action="store_false",
) # noqa
parser.add_argument(
"--without-photos",
help="Do not create Photos section",
default=True,
dest="with_photos",
action="store_false",
) # noqa
parser.add_argument(
"--show-token",
help="Display access token after bootstrap",
default=False,
action="store_true",
) # noqa
opts = parser.parse_args()
# Download the Plex Docker image
print('Creating Plex instance named %s with advertised ip %s' % (opts.server_name, opts.advertise_ip))
if which('docker') is None:
print('Docker is required to be available')
exit(1)
if call(['docker', 'pull', 'plexinc/pms-docker:%s' % opts.docker_tag]) != 0:
print('Got an error when executing docker pull!')
exit(1)
# Start the Plex Docker container
account = get_plex_account(opts)
path = os.path.realpath(os.path.expanduser(opts.destination))
makedirs(os.path.join(path, 'media'), exist_ok=True)
arg_bindings = {
'destination': path,
'hostname': opts.server_name,
'claim_token': account.claimToken() if account else '',
'timezone': opts.timezone,
'advertise_ip': opts.advertise_ip,
'image_tag': opts.docker_tag,
'container_name_extra': '' if account else 'unclaimed-'
}
docker_cmd = [c % arg_bindings for c in DOCKER_CMD]
exit_code = call(docker_cmd)
if exit_code != 0:
raise SystemExit('Error %s while starting the Plex docker container' % exit_code)
media_path = os.path.join(path, "media")
makedirs(media_path, exist_ok=True)
# Download the Plex Docker image
if opts.no_docker is False:
print(
"Creating Plex instance named %s with advertised ip %s"
% (opts.server_name, opts.advertise_ip)
)
if which("docker") is None:
print("Docker is required to be available")
exit(1)
if call(["docker", "pull", "plexinc/pms-docker:%s" % opts.docker_tag]) != 0:
print("Got an error when executing docker pull!")
exit(1)
# Start the Plex Docker container
arg_bindings = {
"destination": path,
"hostname": opts.server_name,
"claim_token": account.claimToken() if account else "",
"timezone": opts.timezone,
"advertise_ip": opts.advertise_ip,
"image_tag": opts.docker_tag,
"container_name_extra": "" if account else "unclaimed-",
}
docker_cmd = [c % arg_bindings for c in DOCKER_CMD]
exit_code = call(docker_cmd)
if exit_code != 0:
raise SystemExit(
"Error %s while starting the Plex docker container" % exit_code
)
# Wait for the Plex container to start
print('Waiting for the Plex container to start..')
print("Waiting for the Plex to start..")
start = time.time()
runtime = 0
server = None
@ -198,145 +460,127 @@ if __name__ == '__main__':
if account:
server = account.device(opts.server_name).connect()
else:
server = PlexServer('http://%s:32400' % opts.advertise_ip)
server = PlexServer("http://%s:32400" % opts.advertise_ip)
if opts.accept_eula:
server.settings.get('acceptedEULA').set(True)
server.settings.get("acceptedEULA").set(True)
server.settings.save()
except KeyboardInterrupt:
break
except Exception as err:
print(err)
time.sleep(1)
runtime = time.time() - start
if not server:
raise SystemExit('Server didnt appear in your account after %ss' % opts.bootstrap_timeout)
print('Plex container started after %ss, downloading content' % int(runtime))
# Download video_stub.mp4
print('Downloading video_stub.mp4..')
if opts.with_movies or opts.with_shows:
media_stub_path = os.path.join(path, 'media', 'video_stub.mp4')
if not os.path.isfile(media_stub_path):
download('http://www.mytvtestpatterns.com/mytvtestpatterns/Default/GetFile?p=PhilipsCircleMP4', '',
filename='video_stub.mp4', savepath=os.path.join(path, 'media'), showstatus=True)
raise SystemExit(
"Server didnt appear in your account after %ss" % opts.bootstrap_timeout
)
print("Plex container started after %ss, setting up content" % int(runtime))
sections = []
# Lets add a check here do somebody dont mess up
# there normal server if they run manual tests.
# Like i did....
if len(server.library.sections()) and opts.no_docker is True:
ans = input(
"The server has %s sections, do you wish to remove it?\n> "
% len(server.library.sections())
)
if ans in ("y", "Y", "Yes"):
ans = input(
"Are you really sure you want to delete %s libraries? There is no way back\n> "
% len(server.library.sections())
)
if ans in ("y", "Y", "Yes"):
clean_pms(server, path)
else:
raise ExistingSection()
else:
raise ExistingSection()
# Prepare Movies section
if opts.with_movies:
print('Preparing movie section..')
movies_path = os.path.join(path, 'media', 'Movies')
makedirs(movies_path, exist_ok=True)
required_movies = {
'Elephants Dream': 2006,
'Sita Sings the Blues': 2008,
'Big Buck Bunny': 2008,
'Sintel': 2010,
}
expected_media_count = 0
for name, year in required_movies.items():
expected_media_count += 1
if not os.path.isfile(get_movie_path(name, year)):
copyfile(media_stub_path, get_movie_path(name, year))
sections.append(dict(name='Movies', type='movie', location='/data/Movies', agent='com.plexapp.agents.imdb',
scanner='Plex Movie Scanner', expected_media_count=expected_media_count))
movies_path = os.path.join(media_path, "Movies")
num_movies = setup_movies(movies_path)
sections.append(
dict(
name="Movies",
type="movie",
location="/data/Movies" if opts.no_docker is False else movies_path,
agent="com.plexapp.agents.imdb",
scanner="Plex Movie Scanner",
expected_media_count=num_movies,
)
)
# Prepare TV Show section
if opts.with_shows:
print('Preparing TV-Shows section..')
tvshows_path = os.path.join(path, 'media', 'TV-Shows')
makedirs(os.path.join(tvshows_path, 'Game of Thrones'), exist_ok=True)
makedirs(os.path.join(tvshows_path, 'The 100'), exist_ok=True)
required_tv_shows = {
'Game of Thrones': [list(range(1, 11)), list(range(1, 11))],
'The 100': [list(range(1, 14)), list(range(1, 17))]
}
expected_media_count = 0
for show_name, seasons in required_tv_shows.items():
for season_id, episodes in enumerate(seasons, start=1):
for episode_id in episodes:
expected_media_count += 1
episode_path = get_tvshow_path(show_name, season_id, episode_id)
if not os.path.isfile(episode_path):
copyfile(get_movie_path('Sintel', 2010), episode_path)
sections.append(dict(name='TV Shows', type='show', location='/data/TV-Shows',
agent='com.plexapp.agents.thetvdb', scanner='Plex Series Scanner',
expected_media_count=expected_media_count))
tvshows_path = os.path.join(media_path, "TV-Shows")
num_ep = setup_show(tvshows_path)
sections.append(
dict(
name="TV Shows",
type="show",
location="/data/TV-Shows" if opts.no_docker is False else tvshows_path,
agent="com.plexapp.agents.thetvdb",
scanner="Plex Series Scanner",
expected_media_count=num_ep,
)
)
# Prepare Music section
if opts.with_music:
print('Preparing Music section..')
music_path = os.path.join(path, 'media', 'Music')
makedirs(music_path, exist_ok=True)
expected_media_count = 0
artist_dst = os.path.join(music_path, 'Infinite State')
dest_path = os.path.join(artist_dst, 'Unmastered Impulses')
if not os.path.isdir(dest_path):
zip_path = os.path.join(artist_dst, 'Unmastered Impulses.zip')
if os.path.isfile(zip_path):
with zipfile.ZipFile(zip_path, 'r') as handle:
handle.extractall(artist_dst)
else:
download('https://github.com/kennethreitz/unmastered-impulses/archive/master.zip', '',
filename='Unmastered Impulses.zip', savepath=artist_dst, unpack=True, showstatus=True)
os.rename(os.path.join(artist_dst, 'unmastered-impulses-master', 'mp3'), dest_path)
rmtree(os.path.join(artist_dst, 'unmastered-impulses-master'))
expected_media_count += len(glob(os.path.join(dest_path, '*.mp3'))) + 2 # wait for artist & album
artist_dst = os.path.join(music_path, 'Broke For Free')
dest_path = os.path.join(artist_dst, 'Layers')
if not os.path.isdir(dest_path):
zip_path = os.path.join(artist_dst, 'Layers.zip')
if not os.path.isfile(zip_path):
download('https://archive.org/compress/Layers-11520/formats=VBR%20MP3&file=/Layers-11520.zip', '',
filename='Layers.zip', savepath=artist_dst, showstatus=True)
makedirs(dest_path, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as handle:
handle.extractall(dest_path)
expected_media_count += len(glob(os.path.join(dest_path, '*.mp3'))) + 2 # wait for artist & album
sections.append(dict(name='Music', type='artist', location='/data/Music',
agent='com.plexapp.agents.lastfm', scanner='Plex Music Scanner',
expected_media_count=expected_media_count))
music_path = os.path.join(media_path, "Music")
song_c = setup_music(music_path)
sections.append(
dict(
name="Music",
type="artist",
location="/data/Music" if opts.no_docker is False else music_path,
agent="com.plexapp.agents.lastfm",
scanner="Plex Music Scanner",
expected_media_count=song_c,
)
)
# Prepare Photos section
if opts.with_photos:
print('Preparing Photos section..')
photos_path = os.path.join(path, 'media', 'Photos')
makedirs(photos_path, exist_ok=True)
expected_photo_count = 0
folders = {
('Cats', ): 3,
('Cats', 'Cats in bed'): 7,
('Cats', 'Cats not in bed'): 1,
('Cats', 'Not cats in bed'): 1,
}
has_photos = 0
for folder_path, required_cnt in folders.items():
folder_path = os.path.join(photos_path, *folder_path)
photos_in_folder = len(glob(os.path.join(folder_path, '*.jpg')))
while photos_in_folder < required_cnt:
photos_in_folder += 1
download('https://picsum.photos/800/600/?random', '',
filename='photo%d.jpg' % photos_in_folder, savepath=folder_path)
has_photos += photos_in_folder
sections.append(dict(name='Photos', type='photo', location='/data/Photos',
agent='com.plexapp.agents.none', scanner='Plex Photo Scanner',
expected_media_count=has_photos))
photos_path = os.path.join(media_path, "Photos")
has_photos = setup_images(photos_path)
sections.append(
dict(
name="Photos",
type="photo",
location="/data/Photos" if opts.no_docker is False else photos_path,
agent="com.plexapp.agents.none",
scanner="Plex Photo Scanner",
expected_media_count=has_photos,
)
)
# Create the Plex library in our instance
if sections:
print('Creating the Plex libraries in our instance')
print("Creating the Plex libraries on %s" % server.friendlyName)
for section in sections:
create_section(server, section, opts)
# Share this instance with the specified username
if account:
shared_username = os.environ.get('SHARED_USERNAME', 'PKKid')
shared_username = os.environ.get("SHARED_USERNAME", "PKKid")
try:
user = account.user(shared_username)
account.updateFriend(user, server)
print('The server was shared with user %s' % shared_username)
print("The server was shared with user %s" % shared_username)
except NotFound:
pass
# Finished: Display our Plex details
print('Base URL is %s' % server.url('', False))
print("Base URL is %s" % server.url("", False))
if account and opts.show_token:
print('Auth token is %s' % account.authenticationToken)
print('Server %s is ready to use!' % opts.server_name)
print("Auth token is %s" % account.authenticationToken)
print("Server %s is ready to use!" % opts.server_name)