Merge pull request #77 from paul-nameless/key-bind

Rewrite key handler
This commit is contained in:
Nameless 2020-06-12 10:51:41 +08:00 committed by GitHub
commit e92eb979fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 211 additions and 165 deletions

View file

@ -143,6 +143,7 @@ For navigation arrow keys also can be used.
- `p`: pin/unpin current chat - `p`: pin/unpin current chat
- `u`: mark read/unread - `u`: mark read/unread
- `r`: read current chat - `r`: read current chat
- `?`: show help
## Msgs: ## Msgs:
@ -155,10 +156,10 @@ For navigation arrow keys also can be used.
image/png; icat %s && read image/png; icat %s && read
audio/*; mpv %s audio/*; mpv %s
``` ```
if text, open in less (to view multiline msgs) if text, open in `less` (to view multiline msgs)
- `e`: edit current msg - `e`: edit current msg
- `<space>`: space can be used to select multiple msgs for deletion or forwarding - `<space>`: space can be used to select multiple msgs for deletion or forwarding
- `y`: yank (copy) selected msgs with <space> to internal buffer - `y`: yank (copy) selected msgs with <space> to internal buffer (for forwarding) and copy current msg text or path to file to clipboard
- `p`: forward (paste) yanked (copied) msgs to current chat - `p`: forward (paste) yanked (copied) msgs to current chat
- `dd`: delete msg for everybody (multiple messages will be deleted if selected) - `dd`: delete msg for everybody (multiple messages will be deleted if selected)
- `i or a`: insert mode, type new message - `i or a`: insert mode, type new message
@ -169,6 +170,6 @@ For navigation arrow keys also can be used.
- `sa`: send audio - `sa`: send audio
- `sp`: send picture - `sp`: send picture
- `sd`: send document - `sd`: send document
- `c`: copy current msg text or path to file if this is document, photo or video
- `]`: next chat - `]`: next chat
- `[`: prev chat - `[`: prev chat
- `?`: show help

View file

@ -33,6 +33,8 @@ MAX_DOWNLOAD_SIZE = "10MB"
# TODO: check platform # TODO: check platform
NOTIFY_CMD = "/usr/local/bin/terminal-notifier -title '{title}' -subtitle '{subtitle}' -message '{msg}' -appIcon '{icon_path}'" NOTIFY_CMD = "/usr/local/bin/terminal-notifier -title '{title}' -subtitle '{subtitle}' -message '{msg}' -appIcon '{icon_path}'"
HELP_CMD = "less"
if _os_name == _linux: if _os_name == _linux:
VOICE_RECORD_CMD = ( VOICE_RECORD_CMD = (
"ffmpeg -f alsa -i default -c:a libopus -b:a 32k '{file_path}'" "ffmpeg -f alsa -i default -c:a libopus -b:a 32k '{file_path}'"
@ -60,6 +62,8 @@ CHAT_FLAGS: Dict[str, str] = {}
MSG_FLAGS: Dict[str, str] = {} MSG_FLAGS: Dict[str, str] = {}
ICON_PATH = os.path.join(os.path.dirname(__file__), "resources", "tg.png")
if os.path.isfile(CONFIG_FILE): if os.path.isfile(CONFIG_FILE):
config_params = runpy.run_path(CONFIG_FILE) config_params = runpy.run_path(CONFIG_FILE)
for param, value in config_params.items(): for param, value in config_params.items():

View file

@ -3,10 +3,10 @@ import logging
import os import os
import threading import threading
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial, wraps
from queue import Queue from queue import Queue
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Any, Callable, Dict, Optional from typing import Any, Callable, Dict, List, Optional
from tg import config from tg import config
from tg.models import Model from tg.models import Model
@ -31,17 +31,37 @@ log = logging.getLogger(__name__)
# cause blan areas on the msg display screen # cause blan areas on the msg display screen
MSGS_LEFT_SCROLL_THRESHOLD = 2 MSGS_LEFT_SCROLL_THRESHOLD = 2
REPLY_MSG_PREFIX = "# >" REPLY_MSG_PREFIX = "# >"
key_bind_handler_type = Callable[[Any], Any] handler_type = Callable[[Any], Any]
chat_handler: Dict[str, handler_type] = {}
msg_handler: Dict[str, handler_type] = {}
def bind(
binding: Dict[str, handler_type],
keys: List[str],
repeat_factor: bool = False,
):
"""bind handlers to given keys"""
def decorator(fun):
@wraps(fun)
def wrapper(*args, **kwargs):
return fun(*args, **kwargs)
@wraps(fun)
def _no_repeat_factor(self, repeat_factor):
return fun(self)
for key in keys:
binding[key] = fun if repeat_factor else _no_repeat_factor
return wrapper
return decorator
class Controller: class Controller:
"""
# MVC
# Model is data from telegram
# Controller handles keyboad events
# View is terminal vindow
"""
def __init__(self, model: Model, view: View, tg: Tdlib) -> None: def __init__(self, model: Model, view: View, tg: Tdlib) -> None:
self.model = model self.model = model
self.view = view self.view = view
@ -50,77 +70,50 @@ class Controller:
self.tg = tg self.tg = tg
self.chat_size = 0.5 self.chat_size = 0.5
self.chat_bindings: Dict[str, key_bind_handler_type] = { def format_help(self, bindings):
"q": lambda _: "QUIT", return "\n".join(
"l": self.handle_msgs, f"{key}\t{fun.__name__}\t{fun.__doc__ or ''}"
"^J": self.handle_msgs, # enter for key, fun in sorted(bindings.items())
"^E": self.handle_msgs, # arrow right )
"j": self.next_chat,
"^B": self.next_chat, # arrow down
"^N": self.next_chat,
"k": self.prev_chat,
"^C": self.prev_chat, # arrow up
"^P": self.prev_chat,
"J": lambda _: self.next_chat(10),
"K": lambda _: self.prev_chat(10),
"gg": lambda _: self.first_chat(),
"bp": lambda _: self.breakpoint(),
"u": lambda _: self.toggle_unread(),
"p": lambda _: self.toggle_pin(),
"m": lambda _: self.toggle_mute(),
"r": lambda _: self.read_msgs(),
}
self.msg_bindings: Dict[str, key_bind_handler_type] = { @bind(chat_handler, ["?"])
"q": lambda _: "QUIT", def show_chat_help(self):
"h": lambda _: "BACK", _help = self.format_help(chat_handler)
"bp": lambda _: self.breakpoint(), with suspend(self.view) as s:
"^D": lambda _: "BACK", # arrow left s.run_with_input(config.HELP_CMD, _help)
# navigate msgs
"]": self.next_chat,
"[": self.prev_chat,
"J": lambda _: self.next_msg(10),
"K": lambda _: self.prev_msg(10),
"j": self.next_msg,
"^B": self.next_msg, # arrow down
"^N": self.next_msg,
"k": self.prev_msg,
"^C": self.prev_msg, # arrow left
"^P": self.prev_msg,
"G": lambda _: self.jump_bottom(),
# send files
"sd": lambda _: self.send_file(self.tg.send_doc),
"sp": lambda _: self.send_file(self.tg.send_photo),
"sa": lambda _: self.send_file(self.tg.send_audio),
"sv": lambda _: self.send_video(),
"v": lambda _: self.send_voice(),
# manipulate msgs
"dd": lambda _: self.delete_msgs(),
"D": lambda _: self.download_current_file(),
"l": lambda _: self.open_current_msg(),
"^J": lambda _: self.open_current_msg(), # enter
"e": lambda _: self.edit_msg(),
"i": lambda _: self.write_short_msg(),
"a": lambda _: self.write_short_msg(),
"I": lambda _: self.write_long_msg(),
"A": lambda _: self.write_long_msg(),
"p": lambda _: self.forward_msgs(),
"y": lambda _: self.copy_msgs(),
"r": lambda _: self.reply_message(),
"R": lambda _: self.reply_with_long_message(),
# message selection
" ": lambda _: self.toggle_select_msg(), # space
"^G": lambda _: self.discard_selected_msgs(),
"^[": lambda _: self.discard_selected_msgs(), # esc
}
@bind(msg_handler, ["?"])
def show_msg_help(self):
_help = self.format_help(msg_handler)
with suspend(self.view) as s:
s.run_with_input(config.HELP_CMD, _help)
@bind(chat_handler, ["bp"])
@bind(msg_handler, ["bp"])
def breakpoint(self):
with suspend(self.view):
breakpoint()
@bind(chat_handler, ["q"])
@bind(msg_handler, ["q"])
def quit(self):
return "QUIT"
@bind(msg_handler, ["h", "^D"])
def back(self):
return "BACK"
@bind(msg_handler, ["p"])
def forward_msgs(self): def forward_msgs(self):
"""Paste yanked msgs"""
if not self.model.forward_msgs(): if not self.model.forward_msgs():
self.present_error("Can't forward msg(s)") self.present_error("Can't forward msg(s)")
return return
self.present_info("Forwarded msg(s)") self.present_info("Forwarded msg(s)")
def copy_msgs(self): @bind(msg_handler, ["y"])
def yank_msgs(self):
"""Copy msgs to clipboard and internal buffer to forward"""
chat_id = self.model.chats.id_by_index(self.model.current_chat) chat_id = self.model.chats.id_by_index(self.model.current_chat)
if not chat_id: if not chat_id:
return return
@ -133,6 +126,7 @@ class Controller:
self.model.copy_msgs_text() self.model.copy_msgs_text()
self.present_info(f"Copied {len(msg_ids)} msg(s)") self.present_info(f"Copied {len(msg_ids)} msg(s)")
@bind(msg_handler, [" "])
def toggle_select_msg(self): def toggle_select_msg(self):
chat_id = self.model.chats.id_by_index(self.model.current_chat) chat_id = self.model.chats.id_by_index(self.model.current_chat)
if not chat_id: if not chat_id:
@ -146,6 +140,7 @@ class Controller:
self.model.next_msg() self.model.next_msg()
self.render_msgs() self.render_msgs()
@bind(msg_handler, ["^G", "^["])
def discard_selected_msgs(self): def discard_selected_msgs(self):
chat_id = self.model.chats.id_by_index(self.model.current_chat) chat_id = self.model.chats.id_by_index(self.model.current_chat)
if not chat_id: if not chat_id:
@ -154,76 +149,30 @@ class Controller:
self.render_msgs() self.render_msgs()
self.present_info("Discarded selected messages") self.present_info("Discarded selected messages")
def jump_bottom(self): @bind(msg_handler, ["G"])
def bottom_msg(self):
if self.model.jump_bottom(): if self.model.jump_bottom():
self.render_msgs() self.render_msgs()
def next_chat(self, repeat_factor: int): @bind(msg_handler, ["j", "^B", "^N"], repeat_factor=True)
if self.model.next_chat(repeat_factor): def next_msg(self, repeat_factor: int = 1):
self.render()
def prev_chat(self, repeat_factor: int):
if self.model.prev_chat(repeat_factor):
self.render()
def first_chat(self):
if self.model.first_chat():
self.render()
def toggle_unread(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_marked_as_unread"]
self.tg.toggle_chat_is_marked_as_unread(chat_id, toggle)
self.render()
def read_msgs(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
msg_id = chat["last_message"]["id"]
self.tg.view_messages(chat_id, [msg_id])
self.render()
def toggle_mute(self):
# TODO: if it's msg to yourself, do not change its
# notification setting, because we can't by documentation,
# instead write about it in status
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
if self.model.is_me(chat_id):
self.present_error("You can't mute Saved Messages")
return
notification_settings = chat["notification_settings"]
if notification_settings["mute_for"]:
notification_settings["mute_for"] = 0
else:
notification_settings["mute_for"] = 2147483647
self.tg.set_chat_nottification_settings(chat_id, notification_settings)
self.render()
def toggle_pin(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_pinned"]
self.tg.toggle_chat_is_pinned(chat_id, toggle)
self.render()
def next_msg(self, repeat_factor: int):
if self.model.next_msg(repeat_factor): if self.model.next_msg(repeat_factor):
self.render_msgs() self.render_msgs()
def prev_msg(self, repeat_factor: int): @bind(msg_handler, ["J"])
def jump_10_msgs_down(self):
self.next_msg(10)
@bind(msg_handler, ["k", "^C", "^P"], repeat_factor=True)
def prev_msg(self, repeat_factor: int = 1):
if self.model.prev_msg(repeat_factor): if self.model.prev_msg(repeat_factor):
self.render_msgs() self.render_msgs()
def breakpoint(self): @bind(msg_handler, ["K"])
with suspend(self.view): def jump_10_msgs_up(self):
breakpoint() self.prev_msg(10)
def can_send_msg(self) -> bool:
chat = self.model.chats.chats[self.model.current_chat]
return chat["permissions"]["can_send_messages"]
@bind(msg_handler, ["r"])
def reply_message(self): def reply_message(self):
if not self.can_send_msg(): if not self.can_send_msg():
self.present_info("Can't send msg in this chat") self.present_info("Can't send msg in this chat")
@ -236,6 +185,7 @@ class Controller:
else: else:
self.present_info("Message reply wasn't sent") self.present_info("Message reply wasn't sent")
@bind(msg_handler, ["R"])
def reply_with_long_message(self): def reply_with_long_message(self):
if not self.can_send_msg(): if not self.can_send_msg():
self.present_info("Can't send msg in this chat") self.present_info("Can't send msg in this chat")
@ -256,6 +206,7 @@ class Controller:
else: else:
self.present_info("Message wasn't sent") self.present_info("Message wasn't sent")
@bind(msg_handler, ["a", "i"])
def write_short_msg(self): def write_short_msg(self):
if not self.can_send_msg(): if not self.can_send_msg():
self.present_info("Can't send msg in this chat") self.present_info("Can't send msg in this chat")
@ -266,6 +217,7 @@ class Controller:
else: else:
self.present_info("Message wasn't sent") self.present_info("Message wasn't sent")
@bind(msg_handler, ["A", "I"])
def write_long_msg(self): def write_long_msg(self):
if not self.can_send_msg(): if not self.can_send_msg():
self.present_info("Can't send msg in this chat") self.present_info("Can't send msg in this chat")
@ -279,6 +231,7 @@ class Controller:
self.model.send_message(text=msg) self.model.send_message(text=msg)
self.present_info("Message sent") self.present_info("Message sent")
@bind(msg_handler, ["sv"])
def send_video(self): def send_video(self):
file_path = self.view.status.get_input() file_path = self.view.status.get_input()
if not file_path or not os.path.isfile(file_path): if not file_path or not os.path.isfile(file_path):
@ -290,6 +243,7 @@ class Controller:
duration = get_duration(file_path) duration = get_duration(file_path)
self.tg.send_video(file_path, chat_id, width, height, duration) self.tg.send_video(file_path, chat_id, width, height, duration)
@bind(msg_handler, ["dd"])
def delete_msgs(self): def delete_msgs(self):
is_deleted = self.model.delete_msgs() is_deleted = self.model.delete_msgs()
self.discard_selected_msgs() self.discard_selected_msgs()
@ -298,6 +252,18 @@ class Controller:
return return
self.present_info("Message deleted") self.present_info("Message deleted")
@bind(msg_handler, ["sd"])
def send_document(self):
send_file(self.tg.send_doc)
@bind(msg_handler, ["sp"])
def send_picture(self):
send_file(self.tg.send_photo)
@bind(msg_handler, ["sa"])
def send_audio(self):
send_file(self.tg.send_audio)
def send_file(self, send_file_fun, *args, **kwargs): def send_file(self, send_file_fun, *args, **kwargs):
file_path = self.view.status.get_input() file_path = self.view.status.get_input()
if file_path and os.path.isfile(file_path): if file_path and os.path.isfile(file_path):
@ -305,7 +271,8 @@ class Controller:
send_file_fun(file_path, chat_id, *args, **kwargs) send_file_fun(file_path, chat_id, *args, **kwargs)
self.present_info("File sent") self.present_info("File sent")
def send_voice(self): @bind(msg_handler, ["v"])
def record_voice(self):
file_path = f"/tmp/voice-{datetime.now()}.oga" file_path = f"/tmp/voice-{datetime.now()}.oga"
with suspend(self.view) as s: with suspend(self.view) as s:
s.call(config.VOICE_RECORD_CMD.format(file_path=file_path)) s.call(config.VOICE_RECORD_CMD.format(file_path=file_path))
@ -328,6 +295,7 @@ class Controller:
self.tg.send_voice(file_path, chat_id, duration, waveform) self.tg.send_voice(file_path, chat_id, duration, waveform)
self.present_info(f"Sent voice msg: {file_path}") self.present_info(f"Sent voice msg: {file_path}")
@bind(msg_handler, ["D"])
def download_current_file(self): def download_current_file(self):
msg = MsgProxy(self.model.current_msg) msg = MsgProxy(self.model.current_msg)
log.debug("Downloading msg: %s", msg.msg) log.debug("Downloading msg: %s", msg.msg)
@ -344,6 +312,11 @@ class Controller:
self.tg.download_file(file_id=file_id) self.tg.download_file(file_id=file_id)
log.info("Downloaded: file_id=%s", file_id) log.info("Downloaded: file_id=%s", file_id)
def can_send_msg(self) -> bool:
chat = self.model.chats.chats[self.model.current_chat]
return chat["permissions"]["can_send_messages"]
@bind(msg_handler, ["l", "^J"])
def open_current_msg(self): def open_current_msg(self):
msg = MsgProxy(self.model.current_msg) msg = MsgProxy(self.model.current_msg)
if msg.is_text: if msg.is_text:
@ -365,18 +338,7 @@ class Controller:
with suspend(self.view) as s: with suspend(self.view) as s:
s.open_file(path) s.open_file(path)
def present_error(self, msg: str): @bind(msg_handler, ["e"])
return self.update_status("Error", msg)
def present_info(self, msg: str):
return self.update_status("Info", msg)
def update_status(self, level: str, msg: str):
self.queue.put(partial(self._update_status, level, msg))
def _update_status(self, level: str, msg: str):
self.view.status.draw(f"{level}: {msg}")
def edit_msg(self): def edit_msg(self):
msg = MsgProxy(self.model.current_msg) msg = MsgProxy(self.model.current_msg)
log.info("Editing msg: %s", msg.msg) log.info("Editing msg: %s", msg.msg)
@ -398,9 +360,84 @@ class Controller:
self.model.edit_message(text=text) self.model.edit_message(text=text)
self.present_info("Message edited") self.present_info("Message edited")
@bind(chat_handler, ["l", "^J", "^E"])
def handle_msgs(self):
rc = self.handle(msg_handler, 0.2)
if rc == "QUIT":
return rc
self.chat_size = 0.5
self.resize()
@bind(chat_handler, ["g"])
def top_chat(self):
if self.model.first_chat():
self.render()
@bind(chat_handler, ["j", "^B", "^N"], repeat_factor=True)
@bind(msg_handler, ["]"])
def next_chat(self, repeat_factor: int = 1):
if self.model.next_chat(repeat_factor):
self.render()
@bind(chat_handler, ["k", "^C", "^P"], repeat_factor=True)
@bind(msg_handler, ["["])
def prev_chat(self, repeat_factor: int = 1):
if self.model.prev_chat(repeat_factor):
self.render()
@bind(chat_handler, ["J"])
def jump_10_chats_down(self):
self.next_chat(10)
@bind(chat_handler, ["K"])
def jump_10_chats_up(self):
self.prev_chat(10)
@bind(chat_handler, ["u"])
def toggle_unread(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_marked_as_unread"]
self.tg.toggle_chat_is_marked_as_unread(chat_id, toggle)
self.render()
@bind(chat_handler, ["r"])
def read_msgs(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
msg_id = chat["last_message"]["id"]
self.tg.view_messages(chat_id, [msg_id])
self.render()
@bind(chat_handler, ["m"])
def toggle_mute(self):
# TODO: if it's msg to yourself, do not change its
# notification setting, because we can't by documentation,
# instead write about it in status
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
if self.model.is_me(chat_id):
self.present_error("You can't mute Saved Messages")
return
notification_settings = chat["notification_settings"]
if notification_settings["mute_for"]:
notification_settings["mute_for"] = 0
else:
notification_settings["mute_for"] = 2147483647
self.tg.set_chat_nottification_settings(chat_id, notification_settings)
self.render()
@bind(chat_handler, ["p"])
def toggle_pin(self):
chat = self.model.chats.chats[self.model.current_chat]
chat_id = chat["id"]
toggle = not chat["is_pinned"]
self.tg.toggle_chat_is_pinned(chat_id, toggle)
self.render()
def run(self) -> None: def run(self) -> None:
try: try:
self.handle(self.chat_bindings, 0.5) self.handle(chat_handler, 0.5)
self.queue.put(self.close) self.queue.put(self.close)
except Exception: except Exception:
log.exception("Error happened in main loop") log.exception("Error happened in main loop")
@ -408,23 +445,14 @@ class Controller:
def close(self): def close(self):
self.is_running = False self.is_running = False
def handle_msgs(self, _: int): def handle(self, handlers: Dict[str, handler_type], size: float):
rc = self.handle(self.msg_bindings, 0.2)
if rc == "QUIT":
return rc
self.chat_size = 0.5
self.resize()
def handle(
self, key_bindings: Dict[str, key_bind_handler_type], size: float
):
self.chat_size = size self.chat_size = size
self.resize() self.resize()
while True: while True:
repeat_factor, keys = self.view.get_keys() repeat_factor, keys = self.view.get_keys()
handler = key_bindings.get(keys, lambda _: None) fun = handlers.get(keys, lambda *_: None)
res = handler(repeat_factor) res = fun(self, repeat_factor) # type: ignore
if res == "QUIT": if res == "QUIT":
return res return res
elif res == "BACK": elif res == "BACK":
@ -460,6 +488,18 @@ class Controller:
except Exception: except Exception:
log.exception("Error happened in draw loop") log.exception("Error happened in draw loop")
def present_error(self, msg: str):
return self.update_status("Error", msg)
def present_info(self, msg: str):
return self.update_status("Info", msg)
def update_status(self, level: str, msg: str):
self.queue.put(partial(self._update_status, level, msg))
def _update_status(self, level: str, msg: str):
self.view.status.draw(f"{level}: {msg}")
def render(self) -> None: def render(self) -> None:
self.queue.put(self._render) self.queue.put(self._render)

View file

Before

Width:  |  Height:  |  Size: 42 KiB

After

Width:  |  Height:  |  Size: 42 KiB

View file

@ -1,5 +1,4 @@
import logging import logging
from datetime import datetime
from functools import wraps from functools import wraps
from typing import Any, Callable, Dict from typing import Any, Callable, Dict

View file

@ -147,9 +147,8 @@ def notify(
): ):
if not cmd: if not cmd:
return return
icon_path = os.path.join(os.path.dirname(__file__), "tg.png")
notify_cmd = cmd.format( notify_cmd = cmd.format(
icon_path=icon_path, title=title, subtitle=subtitle, msg=msg icon_path=config.ICON_PATH, title=title, subtitle=subtitle, msg=msg
) )
log.info("notify-cmd: %s", notify_cmd) log.info("notify-cmd: %s", notify_cmd)
os.system(notify_cmd) os.system(notify_cmd)
@ -184,6 +183,9 @@ class suspend:
def call(self, cmd): def call(self, cmd):
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
def run_with_input(self, cmd, text):
subprocess.run(cmd, universal_newlines=True, input=text, shell=True)
def open_file(self, file_path): def open_file(self, file_path):
cmd = get_file_handler(file_path) cmd = get_file_handler(file_path)
if not cmd: if not cmd: