mirror of
https://github.com/LazoCoder/Pokemon-Terminal
synced 2024-11-22 20:03:14 +00:00
Cleanup and reorganize platform specific stuff, use unlink for close on POSIX, fix ConEmu adapter
This commit is contained in:
parent
b562a5c89a
commit
87a9e449cb
8 changed files with 143 additions and 123 deletions
|
@ -11,7 +11,7 @@ from . import scripter, slideshow
|
|||
from pokemonterminal.command_flags import parser, is_slideshow
|
||||
from pokemonterminal.database import Database
|
||||
from pokemonterminal.filters import Filter
|
||||
from pokemonterminal.platform.named_event import create_named_event
|
||||
from pokemonterminal.platform import PlatformNamedEvent
|
||||
|
||||
|
||||
|
||||
|
@ -69,40 +69,39 @@ def main(argv=None):
|
|||
print("Dry run, exiting.")
|
||||
return
|
||||
|
||||
e = create_named_event("Pokemon-Terminal_Wallpaper" if options.wallpaper else "Pokemon-Terminal_Terminal")
|
||||
try:
|
||||
if options.clear:
|
||||
if e.is_duplicate():
|
||||
e.set()
|
||||
scripter.clear_terminal()
|
||||
return
|
||||
event_name = "Pokemon-Terminal_Wallpaper" if options.wallpaper else "Pokemon-Terminal_Terminal"
|
||||
event_exists = PlatformNamedEvent.exists(event_name)
|
||||
|
||||
if is_slideshow and options.id <= 0 and size > 1:
|
||||
if e.is_duplicate():
|
||||
print("One or more slideshows is already running.\n")
|
||||
while True:
|
||||
print("[S]top the previous slideshow(s) / ", end='')
|
||||
if not options.wallpaper:
|
||||
print("[I]gnore and continue / ", end='')
|
||||
print("[A]bort")
|
||||
inp = input("Pick one: ").lower() # FIXME weird bug: inputting s here doesn't actually close the older process but "pokemon -c" does
|
||||
if inp == 's':
|
||||
e.set()
|
||||
break
|
||||
elif inp == 'i' and not options.wallpaper:
|
||||
break
|
||||
elif inp == 'a':
|
||||
return
|
||||
else:
|
||||
print("Not a valid option!\n")
|
||||
if options.slideshow <= 0:
|
||||
print("Time has to be greater then 0. You can use decimal values.")
|
||||
return
|
||||
e.clear()
|
||||
target_func = scripter.change_wallpaper if options.wallpaper else scripter.change_terminal
|
||||
slideshow.start(Filter.filtered_list, options.slideshow, target_func, e.name())
|
||||
finally:
|
||||
e.close()
|
||||
if options.clear:
|
||||
if event_exists:
|
||||
slideshow.stop(event_name)
|
||||
if not options.wallpaper:
|
||||
scripter.clear_terminal()
|
||||
return
|
||||
|
||||
if is_slideshow and options.id <= 0 and size > 1:
|
||||
if event_exists:
|
||||
print("One or more slideshows is already running.\n")
|
||||
while True:
|
||||
print("[S]top the previous slideshow(s) / ", end='')
|
||||
if not options.wallpaper:
|
||||
print("[I]gnore and continue / ", end='')
|
||||
print("[A]bort")
|
||||
inp = input("Pick one: ").lower()
|
||||
if inp == 's':
|
||||
slideshow.stop(event_name)
|
||||
break
|
||||
elif inp == 'i' and not options.wallpaper:
|
||||
break
|
||||
elif inp == 'a':
|
||||
return
|
||||
else:
|
||||
print("Not a valid option!\n")
|
||||
if options.slideshow <= 0:
|
||||
print("Time has to be greater then 0. You can use decimal values.")
|
||||
return
|
||||
target_func = scripter.change_wallpaper if options.wallpaper else scripter.change_terminal
|
||||
slideshow.start(Filter.filtered_list, options.slideshow, target_func, event_name)
|
||||
|
||||
if options.wallpaper:
|
||||
scripter.change_wallpaper(target.get_path())
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
import sys
|
||||
|
||||
if sys.platform == 'win32':
|
||||
from .named_event.win import WindowsNamedEvent as platform_event
|
||||
else:
|
||||
from .named_event.posix import PosixNamedEvent as platform_event
|
||||
|
||||
PlatformNamedEvent = platform_event
|
|
@ -1,11 +1,67 @@
|
|||
import sys
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from .adapters import NamedEvent
|
||||
class NamedEvent(ABC):
|
||||
"""
|
||||
Interface representing an operating system event object with a name.
|
||||
"""
|
||||
|
||||
def create_named_event(name: str) -> NamedEvent:
|
||||
if sys.platform == 'win32':
|
||||
from .adapters.win import WindowsNamedEvent
|
||||
return WindowsNamedEvent(name)
|
||||
else:
|
||||
from .adapters.posix import PosixNamedEvent
|
||||
return PosixNamedEvent(name)
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def exists(name: str) -> bool:
|
||||
"""
|
||||
check if event exists
|
||||
does not returns event, use the constructor for that.
|
||||
:return a boolean indicating wether the event exists.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def is_set(self) -> bool:
|
||||
"""
|
||||
check if event set
|
||||
:return a boolean indicating wether the event has been set.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set(self):
|
||||
"""
|
||||
sets the event
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def clear(self):
|
||||
"""
|
||||
resets the event
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def wait(self, timeout=None):
|
||||
"""
|
||||
block until event is set
|
||||
:param timeout Optional timeout for wait in seconds.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def name(self) -> str:
|
||||
"""
|
||||
gets the name set at creation of the event
|
||||
:return Name of the event
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
"""
|
||||
release native resources
|
||||
"""
|
||||
pass
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
|
|
@ -1,59 +0,0 @@
|
|||
from abc import ABC, abstractmethod
|
||||
|
||||
class NamedEvent(ABC):
|
||||
"""
|
||||
Interface representing an operating system event object with a name.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def is_set(self) -> bool:
|
||||
"""
|
||||
check if event set
|
||||
:return a boolean indicating wether the event has been set.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set(self):
|
||||
"""
|
||||
sets the event
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def clear(self):
|
||||
"""
|
||||
resets the event
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def wait(self, timeout=None):
|
||||
"""
|
||||
block until event is set
|
||||
:param timeout Optional timeout for wait in seconds.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def is_duplicate(self) -> bool:
|
||||
"""
|
||||
determines if the event was created or opened as a result of creating this class
|
||||
:return True if it was opened. False if it was created.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def name(self) -> str:
|
||||
"""
|
||||
gets the name set at creation of the event
|
||||
:return Name of the event
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self):
|
||||
"""
|
||||
release native resources
|
||||
"""
|
||||
pass
|
|
@ -7,14 +7,19 @@ class PosixNamedEvent(NamedEvent):
|
|||
A wrapper for named events using a named semaphore
|
||||
"""
|
||||
|
||||
def __init__(self, name: str):
|
||||
def exists(name: str) -> bool:
|
||||
semaphore_name = '/' + name
|
||||
try:
|
||||
self.__semaphore = posix_ipc.Semaphore(semaphore_name, flags=posix_ipc.O_CREX)
|
||||
self.__duplicate = False
|
||||
semaphore = posix_ipc.Semaphore(semaphore_name)
|
||||
semaphore.close()
|
||||
return True
|
||||
except posix_ipc.ExistentialError: # Semaphores are reconsidering their life choices
|
||||
self.__semaphore = posix_ipc.Semaphore(semaphore_name)
|
||||
self.__duplicate = True
|
||||
return False
|
||||
|
||||
|
||||
def __init__(self, name: str):
|
||||
semaphore_name = '/' + name
|
||||
self.__semaphore = posix_ipc.Semaphore(semaphore_name, flags=posix_ipc.O_CREAT)
|
||||
|
||||
# NOTE this doesn't works on macOS, see http://semanchuk.com/philip/posix_ipc/#platforms
|
||||
def is_set(self) -> bool:
|
||||
|
@ -37,11 +42,9 @@ class PosixNamedEvent(NamedEvent):
|
|||
except posix_ipc.BusyError:
|
||||
return
|
||||
|
||||
def is_duplicate(self) -> bool:
|
||||
return self.__duplicate
|
||||
|
||||
def name(self) -> str:
|
||||
return self.__semaphore.name
|
||||
|
||||
def close(self):
|
||||
self.__semaphore.close()
|
||||
# FIXME do we also need to close it here?
|
||||
self.__semaphore.unlink()
|
|
@ -7,21 +7,36 @@ class WindowsNamedEvent(NamedEvent):
|
|||
Named events using the native Windows APIs
|
||||
"""
|
||||
|
||||
__SYNCHRONIZE = 0x00100000
|
||||
__EVENT_MODIFY_STATE = 2
|
||||
|
||||
__WAIT_OBJECT_0 = 0
|
||||
__WAIT_FAILED = 0xFFFFFFFF
|
||||
|
||||
__ERROR_ALREADY_EXISTS = 183
|
||||
__ERROR_FILE_NOT_FOUND = 2
|
||||
|
||||
def __raise_last_error():
|
||||
err_no = ctypes.GetLastError()
|
||||
raise WindowsError(err_no, ctypes.FormatError(err_no))
|
||||
|
||||
def exists(name: str) -> bool:
|
||||
event = ctypes.windll.kernel32.OpenEventW(WindowsNamedEvent.__SYNCHRONIZE | WindowsNamedEvent.__EVENT_MODIFY_STATE, False, name)
|
||||
if event == 0:
|
||||
if ctypes.GetLastError() == WindowsNamedEvent.__ERROR_FILE_NOT_FOUND:
|
||||
return False
|
||||
else:
|
||||
WindowsNamedEvent.__raise_last_error()
|
||||
else:
|
||||
result = ctypes.windll.kernel32.CloseHandle(event)
|
||||
if result == 0:
|
||||
WindowsNamedEvent.__raise_last_error()
|
||||
return True
|
||||
|
||||
def __init__(self, name: str):
|
||||
event = ctypes.windll.kernel32.CreateEventW(ctypes.c_void_p(), True, False, name)
|
||||
if event == 0:
|
||||
WindowsNamedEvent.__raise_last_error()
|
||||
|
||||
self.__duplicate = ctypes.GetLastError() == WindowsNamedEvent.__ERROR_ALREADY_EXISTS
|
||||
self.__event = event
|
||||
self.__name = name
|
||||
|
||||
|
@ -47,9 +62,6 @@ class WindowsNamedEvent(NamedEvent):
|
|||
if result == WindowsNamedEvent.__WAIT_FAILED:
|
||||
WindowsNamedEvent.__raise_last_error()
|
||||
|
||||
def is_duplicate(self) -> bool:
|
||||
return self.__duplicate
|
||||
|
||||
def name(self) -> str:
|
||||
return self.__name
|
||||
|
|
@ -2,7 +2,7 @@ import atexit
|
|||
import random
|
||||
import sys
|
||||
|
||||
from .platform.named_event import create_named_event
|
||||
from .platform import PlatformNamedEvent
|
||||
|
||||
def __print_fork(pid, length, delay):
|
||||
print(f"Starting slideshow with {length} Pokemons and a delay of {delay} minutes.")
|
||||
|
@ -27,8 +27,7 @@ def __get_sleeper_and_listener(event):
|
|||
return sleeper, listener
|
||||
|
||||
def __slideshow_worker(filtered, delay, changer_func, event_name):
|
||||
e = create_named_event(event_name)
|
||||
try:
|
||||
with PlatformNamedEvent(event_name) as e:
|
||||
sleeper, listener = __get_sleeper_and_listener(e)
|
||||
random.shuffle(filtered)
|
||||
queque = iter(filtered)
|
||||
|
@ -40,8 +39,6 @@ def __slideshow_worker(filtered, delay, changer_func, event_name):
|
|||
continue
|
||||
changer_func(next_pkmn.get_path())
|
||||
sleeper(delay * 60)
|
||||
finally:
|
||||
e.close()
|
||||
|
||||
def start(filtered, delay, changer_func, event_name):
|
||||
if sys.platform == 'win32':
|
||||
|
@ -58,4 +55,8 @@ def start(filtered, delay, changer_func, event_name):
|
|||
if pid > 0:
|
||||
__print_fork(pid, len(filtered), delay)
|
||||
sys.exit(0)
|
||||
__slideshow_worker(filtered, delay, changer_func, event_name)
|
||||
__slideshow_worker(filtered, delay, changer_func, event_name)
|
||||
|
||||
def stop(event_name):
|
||||
with PlatformNamedEvent(event_name) as e:
|
||||
e.set()
|
||||
|
|
|
@ -11,7 +11,7 @@ class ConEmuProvider(_TProv):
|
|||
return "CONEMUPID" in os.environ
|
||||
|
||||
def __run_command(command: str):
|
||||
output = subprocess.check_output(["ConEmuC", "-GuiMacro", command]).decode(sys.stdout.encoding)
|
||||
output = subprocess.check_output(f"ConEmuC -GuiMacro {command}", shell=True).decode(sys.stdout.encoding)
|
||||
if output != 'OK':
|
||||
print(output)
|
||||
|
||||
|
|
Loading…
Reference in a new issue