Rewrite key handler

Show help with key bindings
Move tg.png icon to resources directory
This commit is contained in:
Paul Nameless 2020-06-11 21:46:09 +08:00
parent c5daf1b5bb
commit 45a4e2cc74
5 changed files with 207 additions and 162 deletions

View file

@ -33,6 +33,8 @@ MAX_DOWNLOAD_SIZE = "10MB"
# TODO: check platform
NOTIFY_CMD = "/usr/local/bin/terminal-notifier -title '{title}' -subtitle '{subtitle}' -message '{msg}' -appIcon '{icon_path}'"
HELP_CMD = "less"
if _os_name == _linux:
VOICE_RECORD_CMD = (
"ffmpeg -f alsa -i default -c:a libopus -b:a 32k '{file_path}'"
@ -60,6 +62,8 @@ CHAT_FLAGS: Dict[str, str] = {}
MSG_FLAGS: Dict[str, str] = {}
ICON_PATH = os.path.join(os.path.dirname(__file__), "resources", "tg.png")
if os.path.isfile(CONFIG_FILE):
config_params = runpy.run_path(CONFIG_FILE)
for param, value in config_params.items():

View file

@ -3,10 +3,10 @@ import logging
import os
import threading
from datetime import datetime
from functools import partial
from functools import partial, wraps
from queue import Queue
from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, Dict, List, Optional
from tg import config
from tg.models import Model
@ -31,17 +31,37 @@ log = logging.getLogger(__name__)
# cause blan areas on the msg display screen
MSGS_LEFT_SCROLL_THRESHOLD = 2
REPLY_MSG_PREFIX = "# >"
key_bind_handler_type = Callable[[Any], Any]
handler_type = Callable[[Any], Any]
chat_handler: Dict[str, handler_type] = {}
msg_handler: Dict[str, handler_type] = {}
def bind(
binding: Dict[str, handler_type],
keys: List[str],
repeat_factor: bool = False,
):
"""bind handlers to given keys"""
def decorator(fun):
@wraps(fun)
def wrapper(*args, **kwargs):
return fun(*args, **kwargs)
@wraps(fun)
def _no_repeat_factor(self, repeat_factor):
return fun(self)
for key in keys:
binding[key] = fun if repeat_factor else _no_repeat_factor
return wrapper
return decorator
class Controller:
"""
# MVC
# Model is data from telegram
# Controller handles keyboad events
# View is terminal vindow
"""
def __init__(self, model: Model, view: View, tg: Tdlib) -> None:
self.model = model
self.view = view
@ -50,77 +70,50 @@ class Controller:
self.tg = tg
self.chat_size = 0.5
self.chat_bindings: Dict[str, key_bind_handler_type] = {
"q": lambda _: "QUIT",
"l": self.handle_msgs,
"^J": self.handle_msgs, # enter
"^E": self.handle_msgs, # arrow right
"j": self.next_chat,
"^B": self.next_chat, # arrow down
"^N": self.next_chat,
"k": self.prev_chat,
"^C": self.prev_chat, # arrow up
"^P": self.prev_chat,
"J": lambda _: self.next_chat(10),
"K": lambda _: self.prev_chat(10),
"gg": lambda _: self.first_chat(),
"bp": lambda _: self.breakpoint(),
"u": lambda _: self.toggle_unread(),
"p": lambda _: self.toggle_pin(),
"m": lambda _: self.toggle_mute(),
"r": lambda _: self.read_msgs(),
}
def format_help(self, bindings):
return "\n".join(
f"{key}\t{fun.__name__}\t{fun.__doc__ or ''}"
for key, fun in sorted(bindings.items())
)
self.msg_bindings: Dict[str, key_bind_handler_type] = {
"q": lambda _: "QUIT",
"h": lambda _: "BACK",
"bp": lambda _: self.breakpoint(),
"^D": lambda _: "BACK", # arrow left
# navigate msgs
"]": self.next_chat,
"[": self.prev_chat,
"J": lambda _: self.next_msg(10),
"K": lambda _: self.prev_msg(10),
"j": self.next_msg,
"^B": self.next_msg, # arrow down
"^N": self.next_msg,
"k": self.prev_msg,
"^C": self.prev_msg, # arrow left
"^P": self.prev_msg,
"G": lambda _: self.jump_bottom(),
# send files
"sd": lambda _: self.send_file(self.tg.send_doc),
"sp": lambda _: self.send_file(self.tg.send_photo),
"sa": lambda _: self.send_file(self.tg.send_audio),
"sv": lambda _: self.send_video(),
"v": lambda _: self.send_voice(),
# manipulate msgs
"dd": lambda _: self.delete_msgs(),
"D": lambda _: self.download_current_file(),
"l": lambda _: self.open_current_msg(),
"^J": lambda _: self.open_current_msg(), # enter
"e": lambda _: self.edit_msg(),
"i": lambda _: self.write_short_msg(),
"a": lambda _: self.write_short_msg(),
"I": lambda _: self.write_long_msg(),
"A": lambda _: self.write_long_msg(),
"p": lambda _: self.forward_msgs(),
"y": lambda _: self.copy_msgs(),
"r": lambda _: self.reply_message(),
"R": lambda _: self.reply_with_long_message(),
# message selection
" ": lambda _: self.toggle_select_msg(), # space
"^G": lambda _: self.discard_selected_msgs(),
"^[": lambda _: self.discard_selected_msgs(), # esc
}
@bind(chat_handler, ["?"])
def show_chat_help(self):
_help = self.format_help(chat)
with suspend(self.view) as s:
s.run_with_input(config.HELP_CMD, _help)
@bind(msg_handler, ["?"])
def show_msg_help(self):
_help = self.format_help(msg)
with suspend(self.view) as s:
s.run_with_input(config.HELP_CMD, _help)
@bind(chat_handler, ["bp"])
@bind(msg_handler, ["bp"])
def breakpoint(self):
with suspend(self.view):
breakpoint()
@bind(chat_handler, ["q"])
@bind(msg_handler, ["q"])
def quit(self):
return "QUIT"
@bind(msg_handler, ["h", "^D"])
def back(self):
return "BACK"
@bind(msg_handler, ["p"])
def forward_msgs(self):
"""Paste yanked msgs"""
if not self.model.forward_msgs():
self.present_error("Can't forward msg(s)")
return
self.present_info("Forwarded msg(s)")
def copy_msgs(self):
@bind(msg_handler, ["y"])
def yank_msgs(self):
"""Copy msgs to clipboard and internal buffer to forward"""
chat_id = self.model.chats.id_by_index(self.model.current_chat)
if not chat_id:
return
@ -133,6 +126,7 @@ class Controller:
self.model.copy_msgs_text()
self.present_info(f"Copied {len(msg_ids)} msg(s)")
@bind(msg_handler, [" "])
def toggle_select_msg(self):
chat_id = self.model.chats.id_by_index(self.model.current_chat)
if not chat_id:
@ -146,6 +140,7 @@ class Controller:
self.model.next_msg()
self.render_msgs()
@bind(msg_handler, ["^G", "^["])
def discard_selected_msgs(self):
chat_id = self.model.chats.id_by_index(self.model.current_chat)
if not chat_id:
@ -154,76 +149,30 @@ class Controller:
self.render_msgs()
self.present_info("Discarded selected messages")
def jump_bottom(self):
@bind(msg_handler, ["G"])
def bottom_msg(self):
if self.model.jump_bottom():
self.render_msgs()
def next_chat(self, repeat_factor: int):
if self.model.next_chat(repeat_factor):
self.render()
def prev_chat(self, repeat_factor: int):
if self.model.prev_chat(repeat_factor):
self.render()
def first_chat(self):
if self.model.first_chat():
self.render()
def toggle_unread(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_marked_as_unread"]
self.tg.toggle_chat_is_marked_as_unread(chat_id, toggle)
self.render()
def read_msgs(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
msg_id = chat["last_message"]["id"]
self.tg.view_messages(chat_id, [msg_id])
self.render()
def toggle_mute(self):
# TODO: if it's msg to yourself, do not change its
# notification setting, because we can't by documentation,
# instead write about it in status
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
if self.model.is_me(chat_id):
self.present_error("You can't mute Saved Messages")
return
notification_settings = chat["notification_settings"]
if notification_settings["mute_for"]:
notification_settings["mute_for"] = 0
else:
notification_settings["mute_for"] = 2147483647
self.tg.set_chat_nottification_settings(chat_id, notification_settings)
self.render()
def toggle_pin(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_pinned"]
self.tg.toggle_chat_is_pinned(chat_id, toggle)
self.render()
def next_msg(self, repeat_factor: int):
@bind(msg_handler, ["j", "^B", "^N"], repeat_factor=True)
def next_msg(self, repeat_factor: int = 1):
if self.model.next_msg(repeat_factor):
self.render_msgs()
def prev_msg(self, repeat_factor: int):
@bind(msg_handler, ["J"])
def jump_10_msgs_down(self):
self.next_msg(10)
@bind(msg_handler, ["k", "^C", "^P"], repeat_factor=True)
def prev_msg(self, repeat_factor: int = 1):
if self.model.prev_msg(repeat_factor):
self.render_msgs()
def breakpoint(self):
with suspend(self.view):
breakpoint()
def can_send_msg(self) -> bool:
chat = self.model.chats.chats[self.model.current_chat]
return chat["permissions"]["can_send_messages"]
@bind(msg_handler, ["K"])
def jump_10_msgs_up(self):
self.prev_msg(10)
@bind(msg_handler, ["r"])
def reply_message(self):
if not self.can_send_msg():
self.present_info("Can't send msg in this chat")
@ -236,6 +185,7 @@ class Controller:
else:
self.present_info("Message reply wasn't sent")
@bind(msg_handler, ["R"])
def reply_with_long_message(self):
if not self.can_send_msg():
self.present_info("Can't send msg in this chat")
@ -256,6 +206,7 @@ class Controller:
else:
self.present_info("Message wasn't sent")
@bind(msg_handler, ["a", "i"])
def write_short_msg(self):
if not self.can_send_msg():
self.present_info("Can't send msg in this chat")
@ -266,6 +217,7 @@ class Controller:
else:
self.present_info("Message wasn't sent")
@bind(msg_handler, ["A", "I"])
def write_long_msg(self):
if not self.can_send_msg():
self.present_info("Can't send msg in this chat")
@ -279,6 +231,7 @@ class Controller:
self.model.send_message(text=msg)
self.present_info("Message sent")
@bind(msg_handler, ["sv"])
def send_video(self):
file_path = self.view.status.get_input()
if not file_path or not os.path.isfile(file_path):
@ -290,6 +243,7 @@ class Controller:
duration = get_duration(file_path)
self.tg.send_video(file_path, chat_id, width, height, duration)
@bind(msg_handler, ["dd"])
def delete_msgs(self):
is_deleted = self.model.delete_msgs()
self.discard_selected_msgs()
@ -298,6 +252,18 @@ class Controller:
return
self.present_info("Message deleted")
@bind(msg_handler, ["sd"])
def send_document(self):
send_file(self.tg.send_doc)
@bind(msg_handler, ["sp"])
def send_picture(self):
send_file(self.tg.send_photo)
@bind(msg_handler, ["sa"])
def send_audio(self):
send_file(self.tg.send_audio)
def send_file(self, send_file_fun, *args, **kwargs):
file_path = self.view.status.get_input()
if file_path and os.path.isfile(file_path):
@ -305,7 +271,8 @@ class Controller:
send_file_fun(file_path, chat_id, *args, **kwargs)
self.present_info("File sent")
def send_voice(self):
@bind(msg_handler, ["v"])
def record_voice(self):
file_path = f"/tmp/voice-{datetime.now()}.oga"
with suspend(self.view) as s:
s.call(config.VOICE_RECORD_CMD.format(file_path=file_path))
@ -328,6 +295,7 @@ class Controller:
self.tg.send_voice(file_path, chat_id, duration, waveform)
self.present_info(f"Sent voice msg: {file_path}")
@bind(msg_handler, ["D"])
def download_current_file(self):
msg = MsgProxy(self.model.current_msg)
log.debug("Downloading msg: %s", msg.msg)
@ -344,6 +312,11 @@ class Controller:
self.tg.download_file(file_id=file_id)
log.info("Downloaded: file_id=%s", file_id)
def can_send_msg(self) -> bool:
chat = self.model.chats.chats[self.model.current_chat]
return chat["permissions"]["can_send_messages"]
@bind(msg_handler, ["l", "^J"])
def open_current_msg(self):
msg = MsgProxy(self.model.current_msg)
if msg.is_text:
@ -365,18 +338,7 @@ class Controller:
with suspend(self.view) as s:
s.open_file(path)
def present_error(self, msg: str):
return self.update_status("Error", msg)
def present_info(self, msg: str):
return self.update_status("Info", msg)
def update_status(self, level: str, msg: str):
self.queue.put(partial(self._update_status, level, msg))
def _update_status(self, level: str, msg: str):
self.view.status.draw(f"{level}: {msg}")
@bind(msg_handler, ["e"])
def edit_msg(self):
msg = MsgProxy(self.model.current_msg)
log.info("Editing msg: %s", msg.msg)
@ -398,9 +360,84 @@ class Controller:
self.model.edit_message(text=text)
self.present_info("Message edited")
@bind(chat_handler, ["l", "^J", "^E"])
def handle_msgs(self):
rc = self.handle(msg, 0.2)
if rc == "QUIT":
return rc
self.chat_size = 0.5
self.resize()
@bind(chat_handler, ["g"])
def top_chat(self):
if self.model.first_chat():
self.render()
@bind(chat_handler, ["j", "^B", "^N"], repeat_factor=True)
@bind(msg_handler, ["]"])
def next_chat(self, repeat_factor: int = 1):
if self.model.next_chat(repeat_factor):
self.render()
@bind(chat_handler, ["k", "^C", "^P"], repeat_factor=True)
@bind(msg_handler, ["["])
def prev_chat(self, repeat_factor: int = 1):
if self.model.prev_chat(repeat_factor):
self.render()
@bind(chat_handler, ["J"])
def jump_10_chats_down(self):
self.next_chat(10)
@bind(chat_handler, ["K"])
def jump_10_chats_up(self):
self.prev_chat(10)
@bind(chat_handler, ["u"])
def toggle_unread(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_marked_as_unread"]
self.tg.toggle_chat_is_marked_as_unread(chat_id, toggle)
self.render()
@bind(chat_handler, ["r"])
def read_msgs(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
msg_id = chat["last_message"]["id"]
self.tg.view_messages(chat_id, [msg_id])
self.render()
@bind(chat_handler, ["m"])
def toggle_mute(self):
# TODO: if it's msg to yourself, do not change its
# notification setting, because we can't by documentation,
# instead write about it in status
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
if self.model.is_me(chat_id):
self.present_error("You can't mute Saved Messages")
return
notification_settings = chat["notification_settings"]
if notification_settings["mute_for"]:
notification_settings["mute_for"] = 0
else:
notification_settings["mute_for"] = 2147483647
self.tg.set_chat_nottification_settings(chat_id, notification_settings)
self.render()
@bind(chat_handler, ["p"])
def toggle_pin(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_pinned"]
self.tg.toggle_chat_is_pinned(chat_id, toggle)
self.render()
def run(self) -> None:
try:
self.handle(self.chat_bindings, 0.5)
self.handle(chat, 0.5)
self.queue.put(self.close)
except Exception:
log.exception("Error happened in main loop")
@ -408,23 +445,14 @@ class Controller:
def close(self):
self.is_running = False
def handle_msgs(self, _: int):
rc = self.handle(self.msg_bindings, 0.2)
if rc == "QUIT":
return rc
self.chat_size = 0.5
self.resize()
def handle(
self, key_bindings: Dict[str, key_bind_handler_type], size: float
):
def handle(self, handlers: Dict[str, handler_type], size: float):
self.chat_size = size
self.resize()
while True:
repeat_factor, keys = self.view.get_keys()
handler = key_bindings.get(keys, lambda _: None)
res = handler(repeat_factor)
fun = handlers.get(keys, lambda *_: None)
res = fun(self, repeat_factor)
if res == "QUIT":
return res
elif res == "BACK":
@ -460,6 +488,18 @@ class Controller:
except Exception:
log.exception("Error happened in draw loop")
def present_error(self, msg: str):
return self.update_status("Error", msg)
def present_info(self, msg: str):
return self.update_status("Info", msg)
def update_status(self, level: str, msg: str):
self.queue.put(partial(self._update_status, level, msg))
def _update_status(self, level: str, msg: str):
self.view.status.draw(f"{level}: {msg}")
def render(self) -> None:
self.queue.put(self._render)

View file

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 42 KiB

View file

@ -1,5 +1,4 @@
import logging
from datetime import datetime
from functools import wraps
from typing import Any, Callable, Dict

View file

@ -147,9 +147,8 @@ def notify(
):
if not cmd:
return
icon_path = os.path.join(os.path.dirname(__file__), "tg.png")
notify_cmd = cmd.format(
icon_path=icon_path, title=title, subtitle=subtitle, msg=msg
icon_path=config.ICON_PATH, title=title, subtitle=subtitle, msg=msg
)
log.info("notify-cmd: %s", notify_cmd)
os.system(notify_cmd)
@ -184,6 +183,9 @@ class suspend:
def call(self, cmd):
subprocess.call(cmd, shell=True)
def run_with_input(self, cmd, text):
subprocess.run(cmd, universal_newlines=True, input=text, shell=True)
def open_file(self, file_path):
cmd = get_file_handler(file_path)
if not cmd: