mirror of
https://github.com/paul-nameless/tg
synced 2024-11-29 07:10:19 +00:00
Add type hints
This commit is contained in:
parent
cf80ce41af
commit
3413d8fed8
3 changed files with 43 additions and 36 deletions
|
@ -6,7 +6,7 @@ import os
|
|||
DEFAULT_CONFIG = os.path.expanduser("~/.config/tg/tg.conf")
|
||||
DEFAULT_FILES = os.path.expanduser("~/.cache/tg/")
|
||||
max_download_size = "10MB"
|
||||
record_cmd = None
|
||||
record_cmd = "ffmpeg -f avfoundation -i ':0' -ar 22050 -b:a 32k '{file_path}'"
|
||||
long_msg_cmd = "vim -c 'startinsert' {file_path}"
|
||||
editor = os.environ.get("EDITOR", "vi")
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ MSGS_LEFT_SCROLL_THRESHOLD = 10
|
|||
# cause blan areas on the msg display screen
|
||||
MSGS_LEFT_SCROLL_THRESHOLD = 2
|
||||
|
||||
key_bind_handler = Callable[[Any], Any]
|
||||
|
||||
|
||||
class Controller:
|
||||
"""
|
||||
|
@ -51,9 +53,7 @@ class Controller:
|
|||
self.chat_size = 0.5
|
||||
signal(SIGWINCH, self.resize_handler)
|
||||
|
||||
_key_handler_type = Callable[[Any], Any]
|
||||
|
||||
self.chat_bindings: Dict[str, _key_handler_type] = {
|
||||
self.chat_bindings: Dict[str, key_bind_handler] = {
|
||||
"q": lambda _: "QUIT",
|
||||
"l": self.handle_msgs,
|
||||
"j": self.next_chat,
|
||||
|
@ -70,7 +70,7 @@ class Controller:
|
|||
"r": self.read_msgs,
|
||||
}
|
||||
|
||||
self.msg_bindings: Dict[str, _key_handler_type] = {
|
||||
self.msg_bindings: Dict[str, key_bind_handler] = {
|
||||
"q": lambda _: "QUIT",
|
||||
"h": lambda _: "BACK",
|
||||
"^D": lambda _: "BACK",
|
||||
|
@ -99,45 +99,45 @@ class Controller:
|
|||
"bp": self.breakpoint,
|
||||
}
|
||||
|
||||
def jump_bottom(self, _):
|
||||
def jump_bottom(self, _: int):
|
||||
log.info("jump_bottom:")
|
||||
if self.model.jump_bottom():
|
||||
self.refresh_msgs()
|
||||
|
||||
def handle_msgs(self, _):
|
||||
def handle_msgs(self, _: int):
|
||||
rc = self.handle(self.msg_bindings, 0.2)
|
||||
if rc == "QUIT":
|
||||
return rc
|
||||
self.chat_size = 0.5
|
||||
self.resize()
|
||||
|
||||
def next_chat(self, rf):
|
||||
def next_chat(self, rf: int):
|
||||
if self.model.next_chat(rf):
|
||||
self.render()
|
||||
|
||||
def prev_chat(self, rf):
|
||||
def prev_chat(self, rf: int):
|
||||
if self.model.prev_chat(rf):
|
||||
self.render()
|
||||
|
||||
def first_chat(self, _):
|
||||
def first_chat(self, _: int):
|
||||
if self.model.first_chat():
|
||||
self.render()
|
||||
|
||||
def toggle_unread(self, _):
|
||||
def toggle_unread(self, _: int):
|
||||
chat = self.model.chats.chats[self.model.current_chat]
|
||||
chat_id = chat["id"]
|
||||
toggle = not chat["is_marked_as_unread"]
|
||||
self.tg.toggle_chat_is_marked_as_unread(chat_id, toggle)
|
||||
self.render()
|
||||
|
||||
def read_msgs(self, _):
|
||||
def read_msgs(self, _: int):
|
||||
chat = self.model.chats.chats[self.model.current_chat]
|
||||
chat_id = chat["id"]
|
||||
msg_id = chat["last_message"]["id"]
|
||||
self.tg.view_messages(chat_id, [msg_id])
|
||||
self.render()
|
||||
|
||||
def toggle_mute(self, _):
|
||||
def toggle_mute(self, _: int):
|
||||
# TODO: if it's msg to yourself, do not change its
|
||||
# notification setting, because we can't by documentation,
|
||||
# instead write about it in status
|
||||
|
@ -154,26 +154,26 @@ class Controller:
|
|||
self.tg.set_chat_nottification_settings(chat_id, notification_settings)
|
||||
self.render()
|
||||
|
||||
def toggle_pin(self, _):
|
||||
def toggle_pin(self, _: int):
|
||||
chat = self.model.chats.chats[self.model.current_chat]
|
||||
chat_id = chat["id"]
|
||||
toggle = not chat["is_pinned"]
|
||||
self.tg.toggle_chat_is_pinned(chat_id, toggle)
|
||||
self.render()
|
||||
|
||||
def next_msg(self, rf):
|
||||
def next_msg(self, rf: int):
|
||||
if self.model.next_msg(rf):
|
||||
self.refresh_msgs()
|
||||
|
||||
def prev_msg(self, rf):
|
||||
def prev_msg(self, rf: int):
|
||||
if self.model.prev_msg(rf):
|
||||
self.refresh_msgs()
|
||||
|
||||
def breakpoint(self, _):
|
||||
def breakpoint(self, _: int):
|
||||
with suspend(self.view):
|
||||
breakpoint()
|
||||
|
||||
def write_short_msg(self, _):
|
||||
def write_short_msg(self, _: int):
|
||||
# write new message
|
||||
if msg := self.view.status.get_input():
|
||||
self.model.send_message(text=msg)
|
||||
|
@ -181,7 +181,7 @@ class Controller:
|
|||
else:
|
||||
self.present_info("Message wasn't sent")
|
||||
|
||||
def send_video(self, _):
|
||||
def send_video(self, _: int):
|
||||
file_path = self.view.status.get_input()
|
||||
if not file_path or not os.path.isfile(file_path):
|
||||
return
|
||||
|
@ -192,7 +192,7 @@ class Controller:
|
|||
duration = get_duration(file_path)
|
||||
self.tg.send_video(file_path, chat_id, width, height, duration)
|
||||
|
||||
def delete_msg(self, _):
|
||||
def delete_msg(self, _: int):
|
||||
if self.model.delete_msg():
|
||||
self.refresh_msgs()
|
||||
self.present_info("Message deleted")
|
||||
|
@ -204,7 +204,7 @@ class Controller:
|
|||
send_file_fun(file_path, chat_id, *args, **kwargs)
|
||||
self.present_info("File sent")
|
||||
|
||||
def send_voice(self, _):
|
||||
def send_voice(self, _: int):
|
||||
file_path = f"/tmp/voice-{datetime.now()}.oga"
|
||||
with suspend(self.view) as s:
|
||||
s.call(config.record_cmd.format(file_path=file_path))
|
||||
|
@ -213,10 +213,15 @@ class Controller:
|
|||
)
|
||||
if not is_yes(resp):
|
||||
self.present_info("Voice message discarded")
|
||||
elif not os.path.isfile(file_path):
|
||||
return
|
||||
|
||||
if not os.path.isfile(file_path):
|
||||
self.present_info(f"Can't load recording file {file_path}")
|
||||
else:
|
||||
return
|
||||
|
||||
chat_id = self.model.chats.id_by_index(self.model.current_chat)
|
||||
if not chat_id:
|
||||
return
|
||||
duration = get_duration(file_path)
|
||||
waveform = get_waveform(file_path)
|
||||
self.tg.send_voice(file_path, chat_id, duration, waveform)
|
||||
|
@ -228,7 +233,7 @@ class Controller:
|
|||
except Exception:
|
||||
log.exception("Error happened in main loop")
|
||||
|
||||
def download_current_file(self, _):
|
||||
def download_current_file(self, _: int):
|
||||
msg = MsgProxy(self.model.current_msg)
|
||||
log.debug("Downloading msg: %s", msg.msg)
|
||||
file_id = msg.file_id
|
||||
|
@ -244,7 +249,7 @@ class Controller:
|
|||
self.tg.download_file(file_id=file_id)
|
||||
log.info("Downloaded: file_id=%s", file_id)
|
||||
|
||||
def open_current_msg(self, _):
|
||||
def open_current_msg(self, _: int):
|
||||
msg = MsgProxy(self.model.current_msg)
|
||||
if msg.is_text:
|
||||
with NamedTemporaryFile("w", suffix=".txt") as f:
|
||||
|
@ -259,6 +264,8 @@ class Controller:
|
|||
self.present_info("File should be downloaded first")
|
||||
return
|
||||
chat_id = self.model.chats.id_by_index(self.model.current_chat)
|
||||
if not chat_id:
|
||||
return
|
||||
self.tg.open_message_content(chat_id, msg.msg_id)
|
||||
with suspend(self.view) as s:
|
||||
s.open_file(path)
|
||||
|
@ -273,7 +280,7 @@ class Controller:
|
|||
with self.lock:
|
||||
self.view.status.draw(f"{level}: {msg}")
|
||||
|
||||
def edit_msg(self, _):
|
||||
def edit_msg(self, _: int):
|
||||
msg = MsgProxy(self.model.current_msg)
|
||||
log.info("Editing msg: %s", msg.msg)
|
||||
if not self.model.is_me(msg.sender_id):
|
||||
|
@ -290,11 +297,11 @@ class Controller:
|
|||
f.flush()
|
||||
s.call(f"{config.editor} {f.name}")
|
||||
with open(f.name) as f:
|
||||
if msg := f.read().strip():
|
||||
self.model.edit_message(text=msg)
|
||||
if text := f.read().strip():
|
||||
self.model.edit_message(text=text)
|
||||
self.present_info("Message edited")
|
||||
|
||||
def write_long_msg(self, _):
|
||||
def write_long_msg(self, _: int):
|
||||
with NamedTemporaryFile("r+", suffix=".txt") as f, suspend(
|
||||
self.view
|
||||
) as s:
|
||||
|
@ -322,7 +329,7 @@ class Controller:
|
|||
self.view.status.resize(rows, cols)
|
||||
self.render()
|
||||
|
||||
def handle(self, key_bindings, size):
|
||||
def handle(self, key_bindings: Dict[str, key_bind_handler], size: float):
|
||||
self.chat_size = size
|
||||
self.resize()
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ def main():
|
|||
config.max_download_size = utils.parse_size(
|
||||
cfg.get("max_download_size", config.max_download_size)
|
||||
)
|
||||
config.record_cmd = cfg.get("record_cmd")
|
||||
config.record_cmd = cfg.get("record_cmd", config.record_cmd)
|
||||
config.long_msg_cmd = cfg.get("long_msg_cmd", config.long_msg_cmd)
|
||||
tg.login()
|
||||
|
||||
|
|
Loading…
Reference in a new issue