mirror of
https://github.com/paul-nameless/tg
synced 2024-11-29 07:10:19 +00:00
commit
ddd57865a9
5 changed files with 110 additions and 75 deletions
86
tg/config.py
86
tg/config.py
|
@ -1,36 +1,60 @@
|
|||
import configparser
|
||||
import mailcap
|
||||
import mimetypes
|
||||
"""
|
||||
Every parameter (except for DEFAULT_CONFIG) can be
|
||||
overwritten by external config file
|
||||
"""
|
||||
import os
|
||||
import platform
|
||||
import runpy
|
||||
|
||||
DEFAULT_CONFIG = os.path.expanduser("~/.config/tg/tg.conf")
|
||||
_os_name = platform.system()
|
||||
_darwin = "Darwin"
|
||||
_linux = "Linux"
|
||||
|
||||
|
||||
DEFAULT_CONFIG = os.path.expanduser("~/.config/tg/conf.py")
|
||||
DEFAULT_FILES = os.path.expanduser("~/.cache/tg/")
|
||||
max_download_size = "10MB"
|
||||
record_cmd = "ffmpeg -f avfoundation -i ':0' -ar 22050 -b:a 32k '{file_path}'"
|
||||
long_msg_cmd = "vim + -c 'startinsert' {file_path}"
|
||||
editor = os.environ.get("EDITOR", "vi")
|
||||
LOG_LEVEL = "INFO"
|
||||
|
||||
API_ID = "559815"
|
||||
API_HASH = "fd121358f59d764c57c55871aa0807ca"
|
||||
|
||||
PHONE = None
|
||||
ENC_KEY = None
|
||||
|
||||
TDLIB_PATH = None
|
||||
TDLIB_VERBOSITY = 0
|
||||
|
||||
MAX_DOWNLOAD_SIZE = "10MB"
|
||||
|
||||
# TODO: check platform
|
||||
NOTIFY_CMD = "/usr/local/bin/terminal-notifier -title '{title}' -subtitle '{subtitle}' -message '{msg}' -appIcon '{icon_path}'"
|
||||
# TODO: check platform
|
||||
if _os_name == _linux:
|
||||
VOICE_RECORD_CMD = (
|
||||
"ffmpeg -f alsa -i default -ar 22050 -b:a 32k '{file_path}'"
|
||||
)
|
||||
else:
|
||||
VOICE_RECORD_CMD = (
|
||||
"ffmpeg -f avfoundation -i default -ar 22050 -b:a 32k '{file_path}'"
|
||||
)
|
||||
|
||||
# TODO: use mailcap instead of editor
|
||||
LONG_MSG_CMD = "vim + -c 'startinsert' {file_path}"
|
||||
EDITOR = os.environ.get("EDITOR", "vi")
|
||||
|
||||
if _os_name == _linux:
|
||||
DEFAULT_OPEN = "xdg-open '{file_path}'"
|
||||
else:
|
||||
DEFAULT_OPEN = "open '{file_path}'"
|
||||
|
||||
if _os_name == _linux:
|
||||
DEFAULT_COPY = "xclip -selection c"
|
||||
else:
|
||||
DEFAULT_COPY = "pbcopy"
|
||||
|
||||
|
||||
def get_cfg(config=DEFAULT_CONFIG):
|
||||
cfg = configparser.ConfigParser()
|
||||
cfg.read(config)
|
||||
return cfg
|
||||
|
||||
|
||||
def save_cfg(cfg, config=DEFAULT_CONFIG):
|
||||
config_dir = os.path.dirname(config)
|
||||
if not os.path.isdir(config_dir):
|
||||
os.makedirs(config_dir)
|
||||
with open(config, "w") as f:
|
||||
cfg.write(f)
|
||||
|
||||
|
||||
def get_file_handler(file_name, default=None):
|
||||
mtype, _ = mimetypes.guess_type(file_name)
|
||||
if not mtype:
|
||||
return default
|
||||
caps = mailcap.getcaps()
|
||||
handler, view = mailcap.findmatch(caps, mtype, filename=file_name)
|
||||
if not handler:
|
||||
return None
|
||||
return handler
|
||||
if os.path.isfile(DEFAULT_CONFIG):
|
||||
config_params = runpy.run_path(DEFAULT_CONFIG)
|
||||
for param, value in config_params.items():
|
||||
if param.isupper():
|
||||
globals()[param] = value
|
||||
|
|
|
@ -4,7 +4,6 @@ import os
|
|||
import threading
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
from signal import SIGWINCH, signal
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
|
@ -25,15 +24,12 @@ from tg.views import View
|
|||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
MSGS_LEFT_SCROLL_THRESHOLD = 10
|
||||
REPLY_MSG_PREFIX = "# >"
|
||||
|
||||
# start scrolling to next page when number of the msgs left is less than value.
|
||||
# note, that setting high values could lead to situations when long msgs will
|
||||
# be removed from the display in order to achive scroll threshold. this could
|
||||
# cause blan areas on the msg display screen
|
||||
MSGS_LEFT_SCROLL_THRESHOLD = 2
|
||||
|
||||
REPLY_MSG_PREFIX = "# >"
|
||||
key_bind_handler_type = Callable[[Any], Any]
|
||||
|
||||
|
||||
|
@ -51,7 +47,6 @@ class Controller:
|
|||
self.lock = threading.Lock()
|
||||
self.tg = tg
|
||||
self.chat_size = 0.5
|
||||
signal(SIGWINCH, self.resize_handler)
|
||||
|
||||
self.chat_bindings: Dict[str, key_bind_handler_type] = {
|
||||
"q": lambda _: "QUIT",
|
||||
|
@ -73,7 +68,9 @@ class Controller:
|
|||
self.msg_bindings: Dict[str, key_bind_handler_type] = {
|
||||
"q": lambda _: "QUIT",
|
||||
"h": lambda _: "BACK",
|
||||
"bp": lambda _: self.breakpoint(),
|
||||
"^D": lambda _: "BACK",
|
||||
# navigate msgs
|
||||
"]": self.next_chat,
|
||||
"[": self.prev_chat,
|
||||
"J": lambda _: self.next_msg(10),
|
||||
|
@ -83,26 +80,28 @@ class Controller:
|
|||
"k": self.prev_msg,
|
||||
"^P": self.prev_msg,
|
||||
"G": lambda _: self.jump_bottom(),
|
||||
"dd": lambda _: self.delete_msg(),
|
||||
"D": lambda _: self.download_current_file(),
|
||||
"l": lambda _: self.open_current_msg(),
|
||||
# send files
|
||||
"sd": lambda _: self.send_file(self.tg.send_doc),
|
||||
"sp": lambda _: self.send_file(self.tg.send_photo),
|
||||
"sa": lambda _: self.send_file(self.tg.send_audio),
|
||||
"sv": lambda _: self.send_video(),
|
||||
"v": lambda _: self.send_voice(),
|
||||
# manipulate msgs
|
||||
"dd": lambda _: self.delete_msg(),
|
||||
"D": lambda _: self.download_current_file(),
|
||||
"l": lambda _: self.open_current_msg(),
|
||||
"e": lambda _: self.edit_msg(),
|
||||
"i": lambda _: self.write_short_msg(),
|
||||
"a": lambda _: self.write_short_msg(),
|
||||
"I": lambda _: self.write_long_msg(),
|
||||
"A": lambda _: self.write_long_msg(),
|
||||
"p": lambda _: self.forward_msgs(),
|
||||
"y": lambda _: self.copy_msgs(),
|
||||
"r": lambda _: self.reply_message(),
|
||||
"R": lambda _: self.reply_with_long_message(),
|
||||
"bp": lambda _: self.breakpoint(),
|
||||
# message selection
|
||||
" ": lambda _: self.toggle_select_msg(),
|
||||
"^[": lambda _: self.discard_selected_msgs(), # esc
|
||||
"y": lambda _: self.copy_msgs(),
|
||||
"p": lambda _: self.forward_msgs(),
|
||||
}
|
||||
|
||||
def forward_msgs(self):
|
||||
|
@ -230,7 +229,7 @@ class Controller:
|
|||
) as s:
|
||||
f.write(insert_replied_msg(msg))
|
||||
f.seek(0)
|
||||
s.call(config.long_msg_cmd.format(file_path=f.name))
|
||||
s.call(config.LONG_MSG_CMD.format(file_path=f.name))
|
||||
with open(f.name) as f:
|
||||
if msg := strip_replied_msg(f.read().strip()):
|
||||
self.model.reply_message(text=msg)
|
||||
|
@ -272,7 +271,7 @@ class Controller:
|
|||
def send_voice(self):
|
||||
file_path = f"/tmp/voice-{datetime.now()}.oga"
|
||||
with suspend(self.view) as s:
|
||||
s.call(config.record_cmd.format(file_path=file_path))
|
||||
s.call(config.VOICE_RECORD_CMD.format(file_path=file_path))
|
||||
resp = self.view.status.get_input(
|
||||
f"Do you want to send recording: {file_path}? [Y/n]"
|
||||
)
|
||||
|
@ -354,7 +353,7 @@ class Controller:
|
|||
) as s:
|
||||
f.write(msg.text_content)
|
||||
f.flush()
|
||||
s.call(f"{config.editor} {f.name}")
|
||||
s.call(f"{config.EDITOR} {f.name}")
|
||||
with open(f.name) as f:
|
||||
if text := f.read().strip():
|
||||
self.model.edit_message(text=text)
|
||||
|
@ -364,7 +363,7 @@ class Controller:
|
|||
with NamedTemporaryFile("r+", suffix=".txt") as f, suspend(
|
||||
self.view
|
||||
) as s:
|
||||
s.call(config.long_msg_cmd.format(file_path=f.name))
|
||||
s.call(config.LONG_MSG_CMD.format(file_path=f.name))
|
||||
with open(f.name) as f:
|
||||
if msg := f.read().strip():
|
||||
self.model.send_message(text=msg)
|
||||
|
|
40
tg/main.py
40
tg/main.py
|
@ -15,13 +15,24 @@ log = logging.getLogger(__name__)
|
|||
|
||||
|
||||
def run(tg: Tdlib, stdscr: window) -> None:
|
||||
# run this function in thread?
|
||||
|
||||
# handle ctrl+c, to avoid interrupting tg when subprocess is called
|
||||
def interrupt_signal_handler(sig, frame):
|
||||
# TODO: draw on status pane: to quite press <q>
|
||||
log.info("Interrupt signal is handled and ignored on purpose.")
|
||||
|
||||
signal.signal(signal.SIGINT, interrupt_signal_handler)
|
||||
|
||||
model = Model(tg)
|
||||
status_view = StatusView(stdscr)
|
||||
msg_view = MsgView(stdscr, model.msgs, model, model.users)
|
||||
chat_view = ChatView(stdscr)
|
||||
view = View(stdscr, chat_view, msg_view, status_view)
|
||||
controller = Controller(model, view, tg)
|
||||
|
||||
# hanlde resize of terminal correctly
|
||||
signal.signal(signal.SIGWINCH, controller.resize_handler)
|
||||
|
||||
for msg_type, handler in update_handlers.handlers.items():
|
||||
tg.add_update_handler(msg_type, partial(handler, controller))
|
||||
|
||||
|
@ -31,28 +42,17 @@ def run(tg: Tdlib, stdscr: window) -> None:
|
|||
|
||||
|
||||
def main():
|
||||
def signal_handler(sig, frame):
|
||||
log.info("You pressed Ctrl+C!")
|
||||
utils.setup_log(config.LOG_LEVEL)
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
cfg = config.get_cfg()["DEFAULT"]
|
||||
utils.setup_log(cfg.get("level", "DEBUG"))
|
||||
log.debug("#" * 64)
|
||||
tg = Tdlib(
|
||||
api_id=cfg["api_id"],
|
||||
api_hash=cfg["api_hash"],
|
||||
phone=cfg["phone"],
|
||||
database_encryption_key=cfg["enc_key"],
|
||||
files_directory=cfg.get("files", config.DEFAULT_FILES),
|
||||
tdlib_verbosity=cfg.get("tdlib_verbosity", 0),
|
||||
library_path=cfg.get("library_path"),
|
||||
api_id=config.API_ID,
|
||||
api_hash=config.API_HASH,
|
||||
phone=config.PHONE,
|
||||
database_encryption_key=config.ENC_KEY,
|
||||
files_directory=config.DEFAULT_FILES,
|
||||
tdlib_verbosity=config.TDLIB_VERBOSITY,
|
||||
library_path=config.TDLIB_PATH,
|
||||
)
|
||||
config.max_download_size = utils.parse_size(
|
||||
cfg.get("max_download_size", config.max_download_size)
|
||||
)
|
||||
config.record_cmd = cfg.get("record_cmd", config.record_cmd)
|
||||
config.long_msg_cmd = cfg.get("long_msg_cmd", config.long_msg_cmd)
|
||||
tg.login()
|
||||
|
||||
wrapper(partial(run, tg))
|
||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
|||
from functools import wraps
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from tg import config
|
||||
from tg import config, utils
|
||||
from tg.controllers import Controller
|
||||
from tg.msg import MsgProxy
|
||||
|
||||
|
@ -12,6 +12,8 @@ _update_handler_type = Callable[[Controller, Dict[str, Any]], None]
|
|||
|
||||
handlers: Dict[str, _update_handler_type] = {}
|
||||
|
||||
max_download_size: int = utils.parse_size(config.MAX_DOWNLOAD_SIZE)
|
||||
|
||||
|
||||
def update_handler(update_type):
|
||||
def decorator(fun):
|
||||
|
@ -57,7 +59,7 @@ def update_new_msg(controller: Controller, update: Dict[str, Any]):
|
|||
)
|
||||
if current_chat_id == msg.chat_id:
|
||||
controller.refresh_msgs()
|
||||
if msg.file_id and msg.size <= config.max_download_size:
|
||||
if msg.file_id and msg.size <= max_download_size:
|
||||
controller.download(msg.file_id, msg.chat_id, msg["id"])
|
||||
|
||||
controller._notify_for_message(msg.chat_id, msg)
|
||||
|
|
22
tg/utils.py
22
tg/utils.py
|
@ -1,7 +1,9 @@
|
|||
import base64
|
||||
import curses
|
||||
import logging
|
||||
import mailcap
|
||||
import math
|
||||
import mimetypes
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
|
@ -30,7 +32,18 @@ emoji_pattern = re.compile(
|
|||
units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9, "TB": 10 ** 12}
|
||||
|
||||
|
||||
def parse_size(size):
|
||||
def get_file_handler(file_name, default=None):
|
||||
mtype, _ = mimetypes.guess_type(file_name)
|
||||
if not mtype:
|
||||
return default
|
||||
caps = mailcap.getcaps()
|
||||
handler, view = mailcap.findmatch(caps, mtype, filename=file_name)
|
||||
if not handler:
|
||||
return None
|
||||
return handler
|
||||
|
||||
|
||||
def parse_size(size: str) -> int:
|
||||
if size[-2].isalpha():
|
||||
number, unit = size[:-2], size[-2:]
|
||||
else:
|
||||
|
@ -107,10 +120,7 @@ def setup_log(level="DEBUG"):
|
|||
|
||||
|
||||
def notify(
|
||||
msg,
|
||||
subtitle="",
|
||||
title="tg",
|
||||
cmd=config.get_cfg()["DEFAULT"].get("notify_cmd"),
|
||||
msg, subtitle="", title="tg", cmd=config.NOTIFY_CMD,
|
||||
):
|
||||
if not cmd:
|
||||
return
|
||||
|
@ -146,7 +156,7 @@ class suspend:
|
|||
subprocess.call(cmd, shell=True)
|
||||
|
||||
def open_file(self, file_path):
|
||||
cmd = config.get_file_handler(file_path)
|
||||
cmd = get_file_handler(file_path)
|
||||
if not cmd:
|
||||
return
|
||||
self.call(cmd)
|
||||
|
|
Loading…
Reference in a new issue