Draft: introduce top status panel

This commit is contained in:
Paul Nameless 2020-06-27 22:39:44 +08:00
parent f24e71f5c0
commit 9d27f6c3d9
5 changed files with 222 additions and 26 deletions

View file

@ -2,7 +2,6 @@ import curses
import logging
import os
import shlex
import threading
from datetime import datetime
from functools import partial, wraps
from queue import Queue
@ -12,12 +11,11 @@ from typing import Any, Callable, Dict, List, Optional
from tg import config
from tg.models import Model
from tg.msg import MsgProxy
from tg.tdlib import Tdlib
from tg.tdlib import Action, Tdlib
from tg.utils import (
get_duration,
get_video_resolution,
get_waveform,
handle_exception,
is_yes,
notify,
suspend,
@ -238,10 +236,13 @@ class Controller:
if not self.can_send_msg():
self.present_info("Can't send msg in this chat")
return
chat_id = self.model.chats.id_by_index(self.model.current_chat)
self.tg.send_chat_action(chat_id, Action.chatActionTyping)
if msg := self.view.status.get_input():
self.model.send_message(text=msg)
self.present_info("Message sent")
else:
self.tg.send_chat_action(chat_id, Action.chatActionCancel)
self.present_info("Message wasn't sent")
@bind(msg_handler, ["A", "I"])
@ -252,11 +253,16 @@ class Controller:
with NamedTemporaryFile("r+", suffix=".txt") as f, suspend(
self.view
) as s:
chat_id = self.model.chats.id_by_index(self.model.current_chat)
self.tg.send_chat_action(chat_id, Action.chatActionTyping)
s.call(config.LONG_MSG_CMD.format(file_path=shlex.quote(f.name)))
with open(f.name) as f:
if msg := f.read().strip():
self.model.send_message(text=msg)
self.present_info("Message sent")
else:
self.tg.send_chat_action(chat_id, Action.chatActionCancel)
self.present_info("Message wasn't sent")
@bind(msg_handler, ["sv"])
def send_video(self):
@ -543,14 +549,14 @@ class Controller:
self.queue.put(self._render_chats)
def _render_chats(self) -> None:
page_size = self.view.chats.h
page_size = self.view.chats.h - 1
chats = self.model.get_chats(
self.model.current_chat, page_size, MSGS_LEFT_SCROLL_THRESHOLD
)
selected_chat = min(
self.model.current_chat, page_size - MSGS_LEFT_SCROLL_THRESHOLD
)
self.view.chats.draw(selected_chat, chats)
self.view.chats.draw(selected_chat, chats, self.model.chats.title)
def render_msgs(self) -> None:
self.queue.put(self._render_msgs)
@ -561,10 +567,13 @@ class Controller:
return
msgs = self.model.fetch_msgs(
current_position=current_msg_idx,
page_size=self.view.msgs.h,
page_size=self.view.msgs.h - 1,
msgs_left_scroll_threshold=MSGS_LEFT_SCROLL_THRESHOLD,
)
self.view.msgs.draw(current_msg_idx, msgs, MSGS_LEFT_SCROLL_THRESHOLD)
chat = self.model.chats.chats[self.model.current_chat]
self.view.msgs.draw(
current_msg_idx, msgs, MSGS_LEFT_SCROLL_THRESHOLD, chat
)
def notify_for_message(self, chat_id: int, msg: MsgProxy):
# do not notify, if muted

View file

@ -4,7 +4,7 @@ from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple
from tg.msg import MsgProxy
from tg.tdlib import Tdlib
from tg.tdlib import Action, Tdlib
from tg.utils import copy_to_clipboard
log = logging.getLogger(__name__)
@ -126,6 +126,8 @@ class Model:
limit = offset + page_size
return self.chats.fetch_chats(offset=offset, limit=limit)
# def send_action(self, action: Action)
def send_message(self, text: str) -> bool:
chat_id = self.chats.id_by_index(self.current_chat)
if chat_id is None:
@ -206,6 +208,7 @@ class ChatModel:
self.chats: List[Dict[str, Any]] = []
self.chat_ids: List[int] = []
self.have_full_chat_list = False
self.title: str = "Chats"
def id_by_index(self, index: int) -> Optional[int]:
if index >= len(self.chats):
@ -474,6 +477,9 @@ class UserModel:
self.tg = tg
self.me = None
self.users: Dict[int, Dict] = {}
self.groups: Dict[int, Dict] = {}
self.supergroups: Dict[int, Dict] = {}
self.actions: Dict[int, Dict] = {}
self.not_found: Set[int] = set()
def get_me(self):
@ -487,11 +493,39 @@ class UserModel:
self.me = result.update
return self.me
def get_action(self, chat_id: int) -> Optional[str]:
action = self.actions.get(chat_id)
if action is None:
return None
action_type = action["action"]["@type"]
try:
return Action[action_type].value + "..."
except KeyError:
log.error(f"Action type {action_type} not implemented")
return None
def set_status(self, user_id: int, status: Dict[str, Any]):
if user_id not in self.users:
self.get_user(user_id)
self.users[user_id]["status"] = status
def get_status(self, user_id: int):
if user_id not in self.users:
return None
user_status = self.users[user_id]["status"]
log.info(f"user_status:: {user_status}")
status = self.statuses.get(user_status["@type"])
if status == "online":
expires = user_status["expires"]
if expires < time.time():
return None
return status
elif status == "offline":
was_online = user_status["was_online"]
ago = pretty_ts(was_online)
return f"last seen {ago}"
return f"last seen {status}"
def is_online(self, user_id: int):
user = self.get_user(user_id)
if (
@ -516,3 +550,48 @@ class UserModel:
return {}
self.users[user_id] = result.update
return result.update
def get_group_info(self, group_id):
if group_id in self.groups:
return self.groups[group_id]
self.tg.get_basic_group(group_id)
def get_supergroup_info(self, supergroup_id):
if supergroup_id in self.supergroups:
return self.supergroups[supergroup_id]
self.tg.get_supergroup(supergroup_id)
def pretty_ts(ts):
from datetime import datetime
now = datetime.now()
diff = now - datetime.fromtimestamp(ts)
second_diff = diff.seconds
day_diff = diff.days
if day_diff < 0:
return ""
if day_diff == 0:
if second_diff < 10:
return "just now"
if second_diff < 60:
return f"{second_diff} seconds ago"
if second_diff < 120:
return "a minute ago"
if second_diff < 3600:
return f"{int(second_diff / 60)} minutes ago"
if second_diff < 7200:
return "an hour ago"
if second_diff < 86400:
return f"{int(second_diff / 3600)} hours ago"
if day_diff == 1:
return "Yesterday"
if day_diff < 7:
return f"{day_diff} days ago"
if day_diff < 31:
return f"{int(day_diff / 7)} weeks ago"
if day_diff < 365:
return f"{int(day_diff / 30)} months ago"
return f"{int(day_diff / 365)} years ago"

View file

@ -1,8 +1,25 @@
from enum import Enum
from typing import Any, Dict, List
from telegram.client import AsyncResult, Telegram
class Action(Enum):
chatActionTyping = "typing"
chatActionCancel = "cancel"
chatActionRecordingVideo = "recording video"
chatActionUploadingVideo = "uploading video"
chatActionRecordingVoiceNote = "recording voice note"
chatActionUploadingVoiceNote = "uploading voice note"
chatActionUploadingPhoto = "uploading photo"
chatActionUploadingDocument = "uploading document"
chatActionChoosingLocation = "choosing location"
chatActionChoosingContact = "choosing contact"
chatActionStartPlayingGame = "start playing game"
chatActionRecordingVideoNote = "recording video note"
chatActionUploadingVideoNote = "uploading video note"
class Tdlib(Telegram):
def download_file(
self, file_id, priority=16, offset=0, limit=0, synchronous=False,
@ -188,3 +205,27 @@ class Tdlib(Telegram):
"options": options,
}
return self._send_data(data)
def get_basic_group(self, basic_group_id: int,) -> AsyncResult:
data = {
"@type": "getBasicGroup",
"basic_group_id": basic_group_id,
}
return self._send_data(data)
def get_supergroup(self, supergroup_id: int,) -> AsyncResult:
data = {
"@type": "getSupergroup",
"supergroup_id": supergroup_id,
}
return self._send_data(data)
def send_chat_action(
self, chat_id: int, action: Action, progress: int = None
) -> AsyncResult:
data = {
"@type": "sendChatAction",
"chat_id": chat_id,
"action": {"@type": action.name, "progress": progress},
}
return self._send_data(data)

View file

@ -1,4 +1,5 @@
import logging
import time
from functools import wraps
from typing import Any, Callable, Dict
@ -258,13 +259,38 @@ def update_connection_state(controller: Controller, update: Dict[str, Any]):
"connectionStateConnectingToProxy": "Connecting to proxy...",
"connectionStateConnecting": "Connecting...",
"connectionStateUpdating": "Updating...",
"connectionStateReady": "Ready",
# "connectionStateReady": "Ready",
}
msg = states.get(state, "Unknown state")
controller.present_info(msg)
controller.model.chats.title = states.get(state, "Chats")
controller.render_chats()
@update_handler("updateUserStatus")
def update_user_status(controller: Controller, update: Dict[str, Any]):
controller.model.users.set_status(update["user_id"], update["status"])
controller.render_chats()
controller.render()
@update_handler("updateBasicGroup")
def update_basic_group(controller: Controller, update: Dict[str, Any]):
basic_group = update["basic_group"]
controller.model.users.groups[basic_group["id"]] = basic_group
controller.render_msgs()
@update_handler("updateSupergroup")
def update_supergroup(controller: Controller, update: Dict[str, Any]):
supergroup = update["supergroup"]
controller.model.users.supergroups[supergroup["id"]] = supergroup
controller.render_msgs()
@update_handler("updateUserChatAction")
def update_user_chat_action(controller: Controller, update: Dict[str, Any]):
log.info("typing:: %s", update)
chat_id = update["chat_id"]
if update["action"]["@type"] == "chatActionCancel":
controller.model.users.actions.pop(chat_id, None)
else:
controller.model.users.actions[chat_id] = update
controller.render()

View file

@ -161,19 +161,19 @@ class ChatView:
return tuple(attr | reverse for attr in attrs)
return attrs
def draw(self, current: int, chats: List[Dict[str, Any]]) -> None:
def draw(
self, current: int, chats: List[Dict[str, Any]], title: str = "Chats"
) -> None:
self.win.erase()
line = curses.ACS_VLINE # type: ignore
self.win.vline(0, self.w - 1, line, self.h)
for i, chat in enumerate(chats):
is_selected = i == current
unread_count = chat["unread_count"]
if chat["is_marked_as_unread"]:
unread_count = "unread"
self.win.addstr(0, 0, title.center(self.w - 1), get_color(cyan, -1))
for i, chat in enumerate(chats, 1):
is_selected = i == current + 1
date = get_date(chat)
title = chat["title"]
is_pinned = chat["is_pinned"]
last_msg = get_last_msg(chat)
offset = 0
for attr, elem in zip(
@ -196,7 +196,7 @@ class ChatView:
i, offset, last_msg, self._msg_color(is_selected)
)
if flags := self._get_flags(unread_count, is_pinned, chat):
if flags := self._get_flags(chat):
flags_len = len(flags) + sum(
map(len, emoji_pattern.findall(flags))
)
@ -209,9 +209,7 @@ class ChatView:
self._refresh()
def _get_flags(
self, unread_count: int, is_pinned: bool, chat: Dict[str, Any]
) -> str:
def _get_flags(self, chat: Dict[str, Any]) -> str:
flags = []
msg = chat.get("last_message")
@ -224,17 +222,22 @@ class ChatView:
# last msg haven't been seen by recipient
flags.append("unseen")
if action := self.model.users.get_action(chat["id"]):
flags.append(action)
if self.model.users.is_online(chat["id"]):
flags.append("online")
if is_pinned:
if chat["is_pinned"]:
flags.append("pinned")
if chat["notification_settings"]["mute_for"]:
flags.append("muted")
if unread_count:
flags.append(str(unread_count))
if chat["is_marked_as_unread"]:
flags.append("unread")
elif chat["unread_count"]:
flags.append(str(chat["unread_count"]))
label = " ".join(config.CHAT_FLAGS.get(flag, flag) for flag in flags)
if label:
@ -425,6 +428,7 @@ class MsgView:
current_msg_idx: int,
msgs: List[Tuple[int, Dict[str, Any]]],
min_msg_padding: int,
chat: Dict[str, Any],
) -> None:
self.win.erase()
msgs_to_draw = self._collect_msgs_to_draw(
@ -459,8 +463,45 @@ class MsgView:
self.win.addstr(line_num, column, elem, attr)
column += len(elem)
self.win.addstr(0, 0, self._msg_title(chat), get_color(cyan, -1))
self._refresh()
def _msg_title(self, chat: Dict[str, Any]):
chat_type = chat["type"]["@type"]
info = ""
_type = "unknown"
if chat_type == "chatTypePrivate":
_type = "private"
info = self.model.users.get_status(chat["id"]) or ""
elif chat_type == "chatTypeBasicGroup":
_type = "group"
group = self.model.users.get_group_info(
chat["type"]["basic_group_id"]
)
log.info(f"group:: {group}")
if group:
info = f"{group['member_count']} members"
elif chat_type == "chatTypeSupergroup":
if chat["type"]["is_channel"]:
_type = "channel"
else:
_type = "supergroup"
supergroup = self.model.users.get_supergroup_info(
chat["type"]["supergroup_id"]
)
log.info(f"supergroup:: {supergroup}")
if supergroup:
info = f"{supergroup['member_count']} members"
elif chat_type == "chatTypeSecret":
_type = "secret"
# return f" {chat['title']} [{_type}] {info}".center(self.w)
if action := self.model.users.get_action(chat["id"]):
info = action
return f" {chat['title']}: {info}".center(self.w)
def _msg_attributes(self, is_selected: bool) -> Tuple[int, ...]:
attrs = (
get_color(cyan, -1),