from plexapi.exceptions import BadRequest from . import conftest as utils from plexapi.sync import VIDEO_QUALITY_3_MBPS_720p, AUDIO_BITRATE_192_KBPS, PHOTO_QUALITY_MEDIUM def get_sync_item_from_server(device, sync_item): sync_list = device.syncItems() for item in sync_list.items: if item.id == sync_item.id: return item def is_sync_item_missing(device, sync_item): return not get_sync_item_from_server(device, sync_item) def test_current_device_got_sync_target(clear_sync_device): assert 'sync-target' in clear_sync_device.provides def get_media(item, server): try: return item.getMedia() except BadRequest as e: if 'not_found' in str(e): server.refreshSync() return None else: raise def test_add_movie_to_sync(clear_sync_device, movie): new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) movie._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=movie._server) assert len(media_list) == 1 assert media_list[0].ratingKey == movie.ratingKey def test_delete_sync_item(clear_sync_device, movie): new_item = movie.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) movie._server.refreshSync() new_item_in_myplex = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) sync_items = clear_sync_device.syncItems() for item in sync_items.items: item.delete() utils.wait_until(is_sync_item_missing, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item_in_myplex) def test_add_show_to_sync(clear_sync_device, show): new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) show._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) episodes = show.episodes() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] def test_add_season_to_sync(clear_sync_device, show): season = show.season('Season 1') new_item = season.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) season._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) episodes = season.episodes() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=season._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] def test_add_episode_to_sync(clear_sync_device, episode): new_item = episode.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) episode._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=episode._server) assert 1 == len(media_list) assert episode.ratingKey == media_list[0].ratingKey def test_limited_watched(clear_sync_device, show): show.markUnwatched() new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=False) show._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) episodes = show.episodes()[:5] media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert 5 == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] episodes[0].markWatched() show._server.refreshSync() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert 5 == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] def test_limited_unwatched(clear_sync_device, show): show.markUnwatched() new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, limit=5, unwatched=True) show._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) episodes = show.episodes(viewCount=0)[:5] media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] episodes[0].markWatched() show._server.refreshSync() episodes = show.episodes(viewCount=0)[:5] media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] def test_unlimited_and_watched(clear_sync_device, show): show.markUnwatched() new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=False) show._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) episodes = show.episodes() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] episodes[0].markWatched() show._server.refreshSync() episodes = show.episodes() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] def test_unlimited_and_unwatched(clear_sync_device, show): show.markUnwatched() new_item = show.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device, unwatched=True) show._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) episodes = show.episodes(viewCount=0) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] episodes[0].markWatched() show._server.refreshSync() episodes = show.episodes(viewCount=0) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=show._server) assert len(episodes) == len(media_list) assert [e.ratingKey for e in episodes] == [m.ratingKey for m in media_list] def test_add_music_artist_to_sync(clear_sync_device, artist): new_item = artist.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device) artist._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) tracks = artist.tracks() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=artist._server) assert len(tracks) == len(media_list) assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list] def test_add_music_album_to_sync(clear_sync_device, album): new_item = album.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device) album._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) tracks = album.tracks() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=album._server) assert len(tracks) == len(media_list) assert [t.ratingKey for t in tracks] == [m.ratingKey for m in media_list] def test_add_music_track_to_sync(clear_sync_device, track): new_item = track.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device) track._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=track._server) assert 1 == len(media_list) assert track.ratingKey == media_list[0].ratingKey def test_add_photo_to_sync(clear_sync_device, photoalbum): photo = photoalbum.photo('photo1') new_item = photo.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device) photo._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=photo._server) assert 1 == len(media_list) assert photo.ratingKey == media_list[0].ratingKey def test_sync_entire_library_movies(clear_sync_device, movies): new_item = movies.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) movies._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) section_content = movies.all() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=movies._server) assert len(section_content) == len(media_list) assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list] def test_sync_entire_library_tvshows(clear_sync_device, tvshows): new_item = tvshows.sync(VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) tvshows._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) section_content = tvshows.searchEpisodes() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=tvshows._server) assert len(section_content) == len(media_list) assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list] def test_sync_entire_library_music(clear_sync_device, music): new_item = music.sync(AUDIO_BITRATE_192_KBPS, client=clear_sync_device) music._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) section_content = music.searchTracks() media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=music._server) assert len(section_content) == len(media_list) assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list] def test_sync_entire_library_photos(clear_sync_device, photos): new_item = photos.sync(PHOTO_QUALITY_MEDIUM, client=clear_sync_device) photos._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) # It's not that easy, to just get all the photos within the library, so let`s query for photos with device!=0x0 section_content = photos.search(libtype='photo', **{'device!': '0x0'}) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=photos._server) assert len(section_content) == len(media_list) assert [e.ratingKey for e in section_content] == [m.ratingKey for m in media_list] def test_playlist_movie_sync(plex, clear_sync_device, movies): items = movies.all() playlist = plex.createPlaylist('Sync: Movies', items) new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) playlist._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server) assert len(items) == len(media_list) assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list] playlist.delete() def test_playlist_tvshow_sync(plex, clear_sync_device, show): items = show.episodes() playlist = plex.createPlaylist('Sync: TV Show', items) new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) playlist._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server) assert len(items) == len(media_list) assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list] playlist.delete() def test_playlist_mixed_sync(plex, clear_sync_device, movie, episode): items = [movie, episode] playlist = plex.createPlaylist('Sync: Mixed', items) new_item = playlist.sync(videoQuality=VIDEO_QUALITY_3_MBPS_720p, client=clear_sync_device) playlist._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server) assert len(items) == len(media_list) assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list] playlist.delete() def test_playlist_music_sync(plex, clear_sync_device, artist): items = artist.tracks() playlist = plex.createPlaylist('Sync: Music', items) new_item = playlist.sync(audioBitrate=AUDIO_BITRATE_192_KBPS, client=clear_sync_device) playlist._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server) assert len(items) == len(media_list) assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list] playlist.delete() def test_playlist_photos_sync(plex, clear_sync_device, photoalbum): items = photoalbum.photos() playlist = plex.createPlaylist('Sync: Photos', items) new_item = playlist.sync(photoResolution=PHOTO_QUALITY_MEDIUM, client=clear_sync_device) playlist._server.refreshSync() item = utils.wait_until(get_sync_item_from_server, delay=0.5, timeout=3, device=clear_sync_device, sync_item=new_item) media_list = utils.wait_until(get_media, delay=0.25, timeout=3, item=item, server=playlist._server) assert len(items) == len(media_list) assert [e.ratingKey for e in items] == [m.ratingKey for m in media_list] playlist.delete()