mirror of
https://github.com/pkkid/python-plexapi
synced 2024-11-10 06:04:15 +00:00
54b26fdc25
* [sync] initial commit * fix populating of `state` field in sync.Status * [connection] add posibliity to return first established connection faster * [base] add timeout argument to PlexObject.fetchItems() * [sync] add timeout arg to SyncItem.getMedia() When you have multiple media within one SyncItem it takes a lot of time to get all the info for this media (on my machine it takes about a second for each movie). * [sync] fix marking media as downloaded * [sync] pass clientIdentifier to created SyncItem() * [sync] override __repr__() for sync.Status * fix after @mikes-nasuni`s review * fix python2 compatibility * get rid of sync.init() * use list comprehension * remove timeout from PlexObject.fetchItems() * fix SyncItem under python 2.7 * fix __doc__ in sync module * revert myplex._connect() back to it`s original state * improve sync docs * get rid of PlexObjects where not needed * add X-Plex-Sync-Version=2 to headers * add sync() method into Video, LibrarySection and MyPlexAccount * add SyncItem.delete() * add sync.Policy.create() * use self._default_sync_title instead of _prettyfilename as default title * let the tests begin * add items for refreshing synclists to PlexServer * fix sync tests * sync for everybody! * add TODO doctring for Audio._defaultSyncTitle() * SyncItems tag may be presented only once, there is no need for loop * add more TODO docstrings * hello docs * remove relative import * remove unused variable from tests/test_sync.py
297 lines
12 KiB
Python
297 lines
12 KiB
Python
from time import sleep, time
|
|
|
|
import pytest
|
|
|
|
from plexapi.sync import VIDEO_QUALITY_3_MBPS_720p, AUDIO_BITRATE_192_KBPS, PHOTO_QUALITY_MEDIUM
|
|
|
|
|
|
def ensure_sync_item(device, sync_item, timeout=3):
|
|
start = time()
|
|
while time() - start < timeout:
|
|
sync_list = device.syncItems()
|
|
for item in sync_list.items:
|
|
if item.id == sync_item.id:
|
|
return item
|
|
sleep(0.5)
|
|
|
|
assert False, 'Failed to ensure that required sync_item is exist'
|
|
|
|
|
|
def ensure_sync_item_missing(device, sync_item, timeout=3):
|
|
start = time()
|
|
ret = None
|
|
while time() - start < timeout:
|
|
sync_list = device.syncItems()
|
|
for item in sync_list.items:
|
|
if item.id == sync_item.id:
|
|
ret = item
|
|
|
|
if ret:
|
|
sleep(0.5)
|
|
else:
|
|
break
|
|
|
|
assert not ret, 'Failed to ensure that required sync_item is missing'
|
|
|
|
|
|
def test_current_device_got_sync_target(clear_sync_device):
|
|
assert 'sync-target' in clear_sync_device.provides
|
|
|
|
|
|
def test_add_movie_to_sync(clear_sync_device, movie):
|
|
new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
movie._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert len(media_list) == 1
|
|
assert media_list[0].ratingKey == movie.ratingKey
|
|
|
|
|
|
def test_delete_sync_item(clear_sync_device, movie):
|
|
new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
movie._server.refreshSync()
|
|
new_item_in_myplex = ensure_sync_item(clear_sync_device, new_item)
|
|
sync_items = clear_sync_device.syncItems()
|
|
for item in sync_items.items:
|
|
item.delete()
|
|
ensure_sync_item_missing(clear_sync_device, new_item_in_myplex)
|
|
|
|
|
|
def test_add_show_to_sync(clear_sync_device, show):
|
|
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
show._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
episodes = show.episodes()
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_add_season_to_sync(clear_sync_device, show):
|
|
season = show.season('Season 1')
|
|
new_item = season.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
season._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
episodes = season.episodes()
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_add_episode_to_sync(clear_sync_device, episode):
|
|
new_item = episode.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
episode._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert 1 == len(media_list)
|
|
assert episode.ratingKey == media_list[0].ratingKey
|
|
|
|
|
|
def test_limited_watched(clear_sync_device, show):
|
|
show.markUnwatched()
|
|
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=False)
|
|
show._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
episodes = show.episodes()[:5]
|
|
media_list = item.getMedia()
|
|
assert 5 == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
episodes[0].markWatched()
|
|
show._server.refreshSync()
|
|
media_list = item.getMedia()
|
|
assert 5 == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_limited_unwatched(clear_sync_device, show):
|
|
show.markUnwatched()
|
|
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=True)
|
|
show._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
episodes = show.episodes(viewCount=0)[:5]
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
episodes[0].markWatched()
|
|
show._server.refreshSync()
|
|
episodes = show.episodes(viewCount=0)[:5]
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_unlimited_and_watched(clear_sync_device, show):
|
|
show.markUnwatched()
|
|
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=False)
|
|
show._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
episodes = show.episodes()
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
episodes[0].markWatched()
|
|
show._server.refreshSync()
|
|
episodes = show.episodes()
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_unlimited_and_unwatched(clear_sync_device, show):
|
|
show.markUnwatched()
|
|
new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=True)
|
|
show._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
episodes = show.episodes(viewCount=0)
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
episodes[0].markWatched()
|
|
show._server.refreshSync()
|
|
episodes = show.episodes(viewCount=0)
|
|
media_list = item.getMedia()
|
|
assert len(episodes) == len(media_list)
|
|
assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_add_music_artist_to_sync(clear_sync_device, artist):
|
|
new_item = artist.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
|
|
artist._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
tracks = artist.tracks()
|
|
media_list = item.getMedia()
|
|
assert len(tracks) == len(media_list)
|
|
assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_add_music_album_to_sync(clear_sync_device, album):
|
|
new_item = album.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
|
|
album._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
tracks = album.tracks()
|
|
media_list = item.getMedia()
|
|
assert len(tracks) == len(media_list)
|
|
assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_add_music_track_to_sync(clear_sync_device, track):
|
|
new_item = track.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
|
|
track._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert 1 == len(media_list)
|
|
assert track.ratingKey == media_list[0].ratingKey
|
|
|
|
|
|
def test_add_photo_to_sync(clear_sync_device, photos):
|
|
photo = photos.all()[0]
|
|
if not hasattr(photo, 'librarySectionID'):
|
|
pytest.skip('Photos are not ready for individual synchronization yet')
|
|
new_item = photo.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
|
|
photo._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert 1 == len(media_list)
|
|
assert photo.ratingKey == media_list[0].ratingKey
|
|
|
|
|
|
def test_sync_entire_library_movies(clear_sync_device, movies):
|
|
new_item = movies.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
movies._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
section_content = movies.all()
|
|
media_list = item.getMedia()
|
|
assert len(section_content) == len(media_list)
|
|
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_sync_entire_library_tvshows(clear_sync_device, tvshows):
|
|
new_item = tvshows.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
tvshows._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
section_content = tvshows.searchEpisodes()
|
|
media_list = item.getMedia()
|
|
assert len(section_content) == len(media_list)
|
|
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_sync_entire_library_music(clear_sync_device, music):
|
|
new_item = music.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
|
|
music._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
section_content = music.searchTracks()
|
|
media_list = item.getMedia()
|
|
assert len(section_content) == len(media_list)
|
|
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_sync_entire_library_photos(clear_sync_device, photos):
|
|
new_item = photos.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
|
|
photos._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
section_content = photos.all()
|
|
media_list = item.getMedia()
|
|
assert len(section_content) == len(media_list)
|
|
assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list]
|
|
|
|
|
|
def test_playlist_movie_sync(plex, clear_sync_device, movies):
|
|
items = movies.all()
|
|
playlist = plex.createPlaylist('Sync: Movies', items)
|
|
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
playlist._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert len(items) == len(media_list)
|
|
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
|
|
playlist.delete()
|
|
|
|
|
|
def test_playlist_tvshow_sync(plex, clear_sync_device, show):
|
|
items = show.episodes()
|
|
playlist = plex.createPlaylist('Sync: TV Show', items)
|
|
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
playlist._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert len(items) == len(media_list)
|
|
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
|
|
playlist.delete()
|
|
|
|
|
|
def test_playlist_mixed_sync(plex, clear_sync_device, movie, episode):
|
|
items = [movie, episode]
|
|
playlist = plex.createPlaylist('Sync: Mixed', items)
|
|
new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device)
|
|
playlist._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert len(items) == len(media_list)
|
|
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
|
|
playlist.delete()
|
|
|
|
|
|
def test_playlist_music_sync(plex, clear_sync_device, artist):
|
|
items = artist.tracks()
|
|
playlist = plex.createPlaylist('Sync: Music', items)
|
|
new_item = playlist.sync(audioBitrate=AUDIO_BITRATE_192_KBPS, client=clear_sync_device)
|
|
playlist._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert len(items) == len(media_list)
|
|
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
|
|
playlist.delete()
|
|
|
|
|
|
def test_playlist_photos_sync(plex, clear_sync_device, photos):
|
|
items = photos.all()
|
|
if not hasattr(items[0], 'librarySectionID'):
|
|
pytest.skip('Photos are not ready for individual synchronization yet')
|
|
playlist = plex.createPlaylist('Sync: Photos', items)
|
|
new_item = playlist.sync(photoResolution=PHOTO_QUALITY_MEDIUM, client=clear_sync_device)
|
|
playlist._server.refreshSync()
|
|
item = ensure_sync_item(clear_sync_device, new_item)
|
|
media_list = item.getMedia()
|
|
assert len(items) == len(media_list)
|
|
assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list]
|
|
playlist.delete()
|