Update algorithm for parsing smart filters (#1276)

* Fix minor typo

* Update algorithm for parsing  #1274

* Match style guide of the module

* Fix Edge Cases

- `and` is now considered the default operation instead of raising error
- fix algorithm so that the reserved keys are parsed in the filters dictionary instead of filters group when located later in the feed.
- removed some code repetition

* Join multiple filters by default

If multiple filter groups are parsed they are joined by `and` instead of raising error.

* fix `==` operator parsing

transfers "=" from the value to key

* fix typehinting

* Add new test for smart Filters

this test would fail on old algorithm despite it being logically the exact same filter.

* add test for deeply nested filters

* combine filters test for playlist

* combine filters test for collections

* fix typo

* add test to check parsed fitlers as it is

* edited test to be independent of the server

* Apply suggestions from code review

adhere to style guide of the project

Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

---------

Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>
This commit is contained in:
Dr.Blank 2023-11-05 19:23:57 -05:00 committed by GitHub
parent c03d515ade
commit 56a8df659d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 67 deletions

View file

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
from collections import deque
from datetime import datetime
from typing import Tuple
from urllib.parse import parse_qsl, quote, quote_plus, unquote, urlencode, urlsplit
from plexapi import media, settings, utils
@ -61,63 +63,95 @@ class AdvancedSettingsMixin:
class SmartFilterMixin:
""" Mixing for Plex objects that can have smart filters. """
""" Mixin for Plex objects that can have smart filters. """
def _parseFilterGroups(
self, feed: "deque[Tuple[str, str]]", returnOn: "set[str]|None" = None
) -> dict:
""" Parse filter groups from input lines between push and pop. """
currentFiltersStack: list[dict] = []
operatorForStack = None
if returnOn is None:
returnOn = set("pop")
else:
returnOn.add("pop")
allowedLogicalOperators = ["and", "or"] # first is the default
while feed:
key, value = feed.popleft() # consume the first item
if key == "push":
# recurse and add the result to the current stack
currentFiltersStack.append(
self._parseFilterGroups(feed, returnOn)
)
elif key in returnOn:
# stop iterating and return the current stack
if not key == "pop":
feed.appendleft((key, value)) # put the item back
break
elif key in allowedLogicalOperators:
# set the operator
if operatorForStack and not operatorForStack == key:
raise ValueError(
"cannot have different logical operators for the same"
" filter group"
)
operatorForStack = key
else:
# add the key value pair to the current filter
currentFiltersStack.append({key: value})
if not operatorForStack and len(currentFiltersStack) > 1:
# consider 'and' as the default operator
operatorForStack = allowedLogicalOperators[0]
if operatorForStack:
return {operatorForStack: currentFiltersStack}
return currentFiltersStack.pop()
def _parseQueryFeed(self, feed: "deque[Tuple[str, str]]") -> dict:
""" Parse the query string into a dict. """
filtersDict = {}
special_keys = {"type", "sort"}
integer_keys = {"includeGuids", "limit"}
reserved_keys = special_keys | integer_keys
while feed:
key, value = feed.popleft()
if key in integer_keys:
filtersDict[key] = int(value)
elif key == "type":
filtersDict["libtype"] = utils.reverseSearchType(value)
elif key == "sort":
filtersDict["sort"] = value.split(",")
else:
feed.appendleft((key, value)) # put the item back
filter_group = self._parseFilterGroups(
feed, returnOn=reserved_keys
)
if "filters" in filtersDict:
filtersDict["filters"] = {
"and": [filtersDict["filters"], filter_group]
}
else:
filtersDict["filters"] = filter_group
return filtersDict
def _parseFilters(self, content):
""" Parse the content string and returns the filter dict. """
content = urlsplit(unquote(content))
filters = {}
filterOp = 'and'
filterGroups = [[]]
feed = deque()
for key, value in parse_qsl(content.query):
# Move = sign to key when operator is ==
if value.startswith('='):
key += '='
value = value[1:]
if value.startswith("="):
key, value = f"{key}=", value[1:]
if key == 'includeGuids':
filters['includeGuids'] = int(value)
elif key == 'type':
filters['libtype'] = utils.reverseSearchType(value)
elif key == 'sort':
filters['sort'] = value.split(',')
elif key == 'limit':
filters['limit'] = int(value)
elif key == 'push':
filterGroups[-1].append([])
filterGroups.append(filterGroups[-1][-1])
elif key == 'and':
filterOp = 'and'
elif key == 'or':
filterOp = 'or'
elif key == 'pop':
filterGroups[-1].insert(0, filterOp)
filterGroups.pop()
else:
filterGroups[-1].append({key: value})
feed.append((key, value))
if filterGroups:
filters['filters'] = self._formatFilterGroups(filterGroups.pop())
return filters
def _formatFilterGroups(self, groups):
""" Formats the filter groups into the advanced search rules. """
if len(groups) == 1 and isinstance(groups[0], list):
groups = groups.pop()
filterOp = 'and'
rules = []
for g in groups:
if isinstance(g, list):
rules.append(self._formatFilterGroups(g))
elif isinstance(g, dict):
rules.append(g)
elif g in {'and', 'or'}:
filterOp = g
return {filterOp: rules}
return self._parseQueryFeed(feed)
class SplitMergeMixin:

View file

@ -252,20 +252,38 @@ def test_Collection_createSmart(plex, tvshows):
collection.delete()
def test_Collection_smartFilters(plex, movies):
title = "test_Collection_smartFilters"
advancedFilters = {
'and': [
@pytest.mark.parametrize(
"advancedFilters",
[
{
'or': [
{'title': 'elephant'},
{'title=': 'Big Buck Bunny'}
"and": [
{"or": [{"title": "elephant"}, {"title=": "Big Buck Bunny"}]},
{"year>>": '1990'},
{"unwatched": '1'},
]
},
{'year>>': 1990},
{'unwatched': True}
{
"or": [
{
"and": [
{"title": "elephant"},
{"year>>": '1990'},
{"unwatched": '1'},
]
}
},
{
"and": [
{"title=": "Big Buck Bunny"},
{"year>>": '1990'},
{"unwatched": '1'},
]
},
]
},
],
)
def test_Collection_smartFilters(advancedFilters, plex, movies):
title = "test_Collection_smartFilters"
try:
collection = plex.createCollection(
title=title,
@ -273,9 +291,10 @@ def test_Collection_smartFilters(plex, movies):
smart=True,
limit=5,
sort="year",
filters=advancedFilters
filters=advancedFilters,
)
filters = collection.filters()
assert filters["filters"] == advancedFilters
assert movies.search(**filters) == collection.items()
finally:
collection.delete()

View file

@ -184,20 +184,57 @@ def test_Playlist_createSmart(plex, movies, movie):
playlist.delete()
def test_Playlist_smartFilters(plex, tvshows):
@pytest.mark.parametrize(
"smartFilter",
[
{"or": [{"show.title": "game"}, {"show.title": "100"}]},
{
"and": [
{
"or": [
{
"and": [
{"show.title": "game"},
{"show.title": "thrones"},
{
"or": [
{"show.year>>": "1999"},
{"show.viewCount<<": "3"},
]
},
]
},
{"show.title": "100"},
]
},
{"or": [{"show.contentRating": "TV-14"}, {"show.addedAt>>": "-10y"}]},
{"episode.hdr!": "1"},
]
},
],
)
def test_Playlist_smartFilters(smartFilter, plex, tvshows):
try:
playlist = plex.createPlaylist(
title="smart_playlist_filters",
smart=True,
section=tvshows,
limit=5,
libtype='show',
sort=["season.index:nullsLast", "episode.index:nullsLast", "show.titleSort"],
filters={"or": [{"show.title": "game"}, {'show.title': "100"}]}
libtype="show",
sort=[
"season.index:nullsLast",
"episode.index:nullsLast",
"show.titleSort",
],
filters=smartFilter,
)
filters = playlist.filters()
filters['libtype'] = tvshows.METADATA_TYPE # Override libtype to check playlist items
filters["libtype"] = (
tvshows.METADATA_TYPE
) # Override libtype to check playlist items
assert filters["filters"] == smartFilter
assert tvshows.search(**filters) == playlist.items()
finally:
playlist.delete()