Merge pull request #93 from paul-nameless/status-panel

Introduce top status panel
This commit is contained in:
Nameless 2020-06-30 15:00:12 +08:00 committed by GitHub
commit 1034d4b952
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 271 additions and 53 deletions

View file

@ -2,7 +2,6 @@ import curses
import logging
import os
import shlex
import threading
from datetime import datetime
from functools import partial, wraps
from queue import Queue
@ -12,12 +11,11 @@ from typing import Any, Callable, Dict, List, Optional
from tg import config
from tg.models import Model
from tg.msg import MsgProxy
from tg.tdlib import Tdlib
from tg.tdlib import ChatAction, Tdlib
from tg.utils import (
get_duration,
get_video_resolution,
get_waveform,
handle_exception,
is_yes,
notify,
suspend,
@ -238,10 +236,13 @@ class Controller:
if not self.can_send_msg():
self.present_info("Can't send msg in this chat")
return
chat_id = self.model.chats.id_by_index(self.model.current_chat)
self.tg.send_chat_action(chat_id, ChatAction.chatActionTyping)
if msg := self.view.status.get_input():
self.model.send_message(text=msg)
self.present_info("Message sent")
else:
self.tg.send_chat_action(chat_id, ChatAction.chatActionCancel)
self.present_info("Message wasn't sent")
@bind(msg_handler, ["A", "I"])
@ -252,11 +253,18 @@ class Controller:
with NamedTemporaryFile("r+", suffix=".txt") as f, suspend(
self.view
) as s:
chat_id = self.model.chats.id_by_index(self.model.current_chat)
self.tg.send_chat_action(chat_id, ChatAction.chatActionTyping)
s.call(config.LONG_MSG_CMD.format(file_path=shlex.quote(f.name)))
with open(f.name) as f:
if msg := f.read().strip():
self.model.send_message(text=msg)
self.present_info("Message sent")
else:
self.tg.send_chat_action(
chat_id, ChatAction.chatActionCancel
)
self.present_info("Message wasn't sent")
@bind(msg_handler, ["sv"])
def send_video(self):
@ -543,14 +551,14 @@ class Controller:
self.queue.put(self._render_chats)
def _render_chats(self) -> None:
page_size = self.view.chats.h
page_size = self.view.chats.h - 1
chats = self.model.get_chats(
self.model.current_chat, page_size, MSGS_LEFT_SCROLL_THRESHOLD
)
selected_chat = min(
self.model.current_chat, page_size - MSGS_LEFT_SCROLL_THRESHOLD
)
self.view.chats.draw(selected_chat, chats)
self.view.chats.draw(selected_chat, chats, self.model.chats.title)
def render_msgs(self) -> None:
self.queue.put(self._render_msgs)
@ -561,10 +569,13 @@ class Controller:
return
msgs = self.model.fetch_msgs(
current_position=current_msg_idx,
page_size=self.view.msgs.h,
page_size=self.view.msgs.h - 1,
msgs_left_scroll_threshold=MSGS_LEFT_SCROLL_THRESHOLD,
)
self.view.msgs.draw(current_msg_idx, msgs, MSGS_LEFT_SCROLL_THRESHOLD)
chat = self.model.chats.chats[self.model.current_chat]
self.view.msgs.draw(
current_msg_idx, msgs, MSGS_LEFT_SCROLL_THRESHOLD, chat
)
def notify_for_message(self, chat_id: int, msg: MsgProxy):
# do not notify, if muted

View file

@ -4,8 +4,8 @@ from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple
from tg.msg import MsgProxy
from tg.tdlib import Tdlib
from tg.utils import copy_to_clipboard
from tg.tdlib import ChatAction, Tdlib, UserStatus
from tg.utils import copy_to_clipboard, pretty_ts
log = logging.getLogger(__name__)
@ -206,6 +206,7 @@ class ChatModel:
self.chats: List[Dict[str, Any]] = []
self.chat_ids: List[int] = []
self.have_full_chat_list = False
self.title: str = "Chats"
def id_by_index(self, index: int) -> Optional[int]:
if index >= len(self.chats):
@ -454,15 +455,6 @@ class MsgModel:
class UserModel:
statuses = {
"userStatusEmpty": "",
"userStatusOnline": "online",
"userStatusOffline": "offline",
"userStatusRecently": "recently",
"userStatusLastWeek": "last week",
"userStatusLastMonth": "last month",
}
types = {
"userTypeUnknown": "unknown",
"userTypeBot": "bot",
@ -474,6 +466,9 @@ class UserModel:
self.tg = tg
self.me = None
self.users: Dict[int, Dict] = {}
self.groups: Dict[int, Dict] = {}
self.supergroups: Dict[int, Dict] = {}
self.actions: Dict[int, Dict] = {}
self.not_found: Set[int] = set()
def get_me(self):
@ -487,11 +482,46 @@ class UserModel:
self.me = result.update
return self.me
def get_action(self, chat_id: int) -> Optional[str]:
action = self.actions.get(chat_id)
if action is None:
return None
action_type = action["action"]["@type"]
try:
return ChatAction[action_type].value + "..."
except KeyError:
log.error(f"ChatAction type {action_type} not implemented")
return None
def set_status(self, user_id: int, status: Dict[str, Any]):
if user_id not in self.users:
self.get_user(user_id)
self.users[user_id]["status"] = status
def get_status(self, user_id: int) -> str:
if user_id not in self.users:
return ""
user_status = self.users[user_id]["status"]
try:
status = UserStatus[user_status["@type"]]
except KeyError:
log.error(f"UserStatus type {user_status} not implemented")
return ""
if status == UserStatus.userStatusEmpty:
return ""
elif status == UserStatus.userStatusOnline:
expires = user_status["expires"]
if expires < time.time():
return ""
return status.value
elif status == UserStatus.userStatusOffline:
was_online = user_status["was_online"]
ago = pretty_ts(was_online)
return f"last seen {ago}"
return f"last seen {status.value}"
def is_online(self, user_id: int):
user = self.get_user(user_id)
if (
@ -516,3 +546,17 @@ class UserModel:
return {}
self.users[user_id] = result.update
return result.update
def get_group_info(self, group_id: int) -> Optional[Dict[str, Any]]:
if group_id in self.groups:
return self.groups[group_id]
self.tg.get_basic_group(group_id)
return None
def get_supergroup_info(
self, supergroup_id: int
) -> Optional[Dict[str, Any]]:
if supergroup_id in self.supergroups:
return self.supergroups[supergroup_id]
self.tg.get_supergroup(supergroup_id)
return None

View file

@ -1,8 +1,42 @@
from enum import Enum
from typing import Any, Dict, List
from telegram.client import AsyncResult, Telegram
class ChatAction(Enum):
chatActionTyping = "typing"
chatActionCancel = "cancel"
chatActionRecordingVideo = "recording video"
chatActionUploadingVideo = "uploading video"
chatActionRecordingVoiceNote = "recording voice"
chatActionUploadingVoiceNote = "uploading voice"
chatActionUploadingPhoto = "uploading photo"
chatActionUploadingDocument = "uploading document"
chatActionChoosingLocation = "choosing location"
chatActionChoosingContact = "choosing contact"
chatActionStartPlayingGame = "start playing game"
chatActionRecordingVideoNote = "recording video"
chatActionUploadingVideoNote = "uploading video"
class ChatType(Enum):
chatTypePrivate = "private"
chatTypeBasicGroup = "group"
chatTypeSupergroup = "supergroup"
channel = "channel"
chatTypeSecret = "secret"
class UserStatus(Enum):
userStatusEmpty = ""
userStatusOnline = "online"
userStatusOffline = "offline"
userStatusRecently = "recently"
userStatusLastWeek = "last week"
userStatusLastMonth = "last month"
class Tdlib(Telegram):
def download_file(
self, file_id, priority=16, offset=0, limit=0, synchronous=False,
@ -188,3 +222,27 @@ class Tdlib(Telegram):
"options": options,
}
return self._send_data(data)
def get_basic_group(self, basic_group_id: int,) -> AsyncResult:
data = {
"@type": "getBasicGroup",
"basic_group_id": basic_group_id,
}
return self._send_data(data)
def get_supergroup(self, supergroup_id: int,) -> AsyncResult:
data = {
"@type": "getSupergroup",
"supergroup_id": supergroup_id,
}
return self._send_data(data)
def send_chat_action(
self, chat_id: int, action: ChatAction, progress: int = None
) -> AsyncResult:
data = {
"@type": "sendChatAction",
"chat_id": chat_id,
"action": {"@type": action.name, "progress": progress},
}
return self._send_data(data)

View file

@ -76,7 +76,6 @@ def update_new_message(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatOrder")
def update_chat_order(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatOrder")
current_chat_id = controller.model.current_chat_id
chat_id = update["chat_id"]
order = update["order"]
@ -87,7 +86,6 @@ def update_chat_order(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatTitle")
def update_chat_title(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatTitle")
chat_id = update["chat_id"]
title = update["title"]
@ -100,7 +98,6 @@ def update_chat_title(controller: Controller, update: Dict[str, Any]):
def update_chat_is_marked_as_unread(
controller: Controller, update: Dict[str, Any]
):
log.info("Proccessing updateChatIsMarkedAsUnread")
chat_id = update["chat_id"]
is_marked_as_unread = update["is_marked_as_unread"]
@ -113,7 +110,6 @@ def update_chat_is_marked_as_unread(
@update_handler("updateChatIsPinned")
def update_chat_is_pinned(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatIsPinned")
chat_id = update["chat_id"]
is_pinned = update["is_pinned"]
order = update["order"]
@ -127,7 +123,6 @@ def update_chat_is_pinned(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatReadOutbox")
def update_chat_read_outbox(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatReadOutbox")
chat_id = update["chat_id"]
last_read_outbox_message_id = update["last_read_outbox_message_id"]
@ -140,7 +135,6 @@ def update_chat_read_outbox(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatReadInbox")
def update_chat_read_inbox(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatReadInbox")
chat_id = update["chat_id"]
last_read_inbox_message_id = update["last_read_inbox_message_id"]
unread_count = update["unread_count"]
@ -156,7 +150,6 @@ def update_chat_read_inbox(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatDraftMessage")
def update_chat_draft_message(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatDraftMessage")
chat_id = update["chat_id"]
# FIXME: ignoring draft message itself for now because UI can't show it
# draft_message = update["draft_message"]
@ -169,7 +162,6 @@ def update_chat_draft_message(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatLastMessage")
def update_chat_last_message(controller: Controller, update: Dict[str, Any]):
log.info("Proccessing updateChatLastMessage")
chat_id = update["chat_id"]
last_message = update.get("last_message")
if not last_message:
@ -187,7 +179,6 @@ def update_chat_last_message(controller: Controller, update: Dict[str, Any]):
@update_handler("updateChatNotificationSettings")
def update_chat_notification_settings(controller: Controller, update):
log.info("Proccessing update_chat_notification_settings")
chat_id = update["chat_id"]
notification_settings = update["notification_settings"]
if controller.model.chats.update_chat(
@ -210,7 +201,6 @@ def update_message_send_succeeded(controller: Controller, update):
@update_handler("updateFile")
def update_file(controller: Controller, update):
log.info("update_file: %s", update)
file_id = update["file"]["id"]
local = update["file"]["local"]
chat_id, msg_id = controller.model.downloads.get(file_id, (None, None))
@ -251,20 +241,44 @@ def update_delete_messages(controller: Controller, update: Dict[str, Any]):
@update_handler("updateConnectionState")
def update_connection_state(controller: Controller, update: Dict[str, Any]):
log.info("state:: %s", update)
state = update["state"]["@type"]
states = {
"connectionStateWaitingForNetwork": "Waiting for network...",
"connectionStateConnectingToProxy": "Connecting to proxy...",
"connectionStateConnecting": "Connecting...",
"connectionStateUpdating": "Updating...",
"connectionStateReady": "Ready",
# state exists, but when it's "Ready" we want to show "Chats"
# "connectionStateReady": "Ready",
}
msg = states.get(state, "Unknown state")
controller.present_info(msg)
controller.model.chats.title = states.get(state, "Chats")
controller.render_chats()
@update_handler("updateUserStatus")
def update_user_status(controller: Controller, update: Dict[str, Any]):
controller.model.users.set_status(update["user_id"], update["status"])
controller.render_chats()
controller.render()
@update_handler("updateBasicGroup")
def update_basic_group(controller: Controller, update: Dict[str, Any]):
basic_group = update["basic_group"]
controller.model.users.groups[basic_group["id"]] = basic_group
controller.render_msgs()
@update_handler("updateSupergroup")
def update_supergroup(controller: Controller, update: Dict[str, Any]):
supergroup = update["supergroup"]
controller.model.users.supergroups[supergroup["id"]] = supergroup
controller.render_msgs()
@update_handler("updateUserChatAction")
def update_user_chat_action(controller: Controller, update: Dict[str, Any]):
chat_id = update["chat_id"]
if update["action"]["@type"] == "chatActionCancel":
controller.model.users.actions.pop(chat_id, None)
else:
controller.model.users.actions[chat_id] = update
controller.render()

View file

@ -216,3 +216,37 @@ class suspend:
def set_shorter_esc_delay(delay=25):
os.environ.setdefault("ESCDELAY", str(delay))
def pretty_ts(ts: int) -> str:
now = datetime.utcnow()
diff = now - datetime.utcfromtimestamp(ts)
second_diff = diff.seconds
day_diff = diff.days
log.info("diff:: %s, %s, %s", ts, second_diff, day_diff)
if day_diff < 0:
return ""
if day_diff == 0:
if second_diff < 10:
return "just now"
if second_diff < 60:
return f"{second_diff} seconds ago"
if second_diff < 120:
return "a minute ago"
if second_diff < 3600:
return f"{int(second_diff / 60)} minutes ago"
if second_diff < 7200:
return "an hour ago"
if second_diff < 86400:
return f"{int(second_diff / 3600)} hours ago"
if day_diff == 1:
return "Yesterday"
if day_diff < 7:
return f"{day_diff} days ago"
if day_diff < 31:
return f"{int(day_diff / 7)} weeks ago"
if day_diff < 365:
return f"{int(day_diff / 30)} months ago"
return f"{int(day_diff / 365)} years ago"

View file

@ -5,9 +5,19 @@ from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, cast
from tg import config
from tg.colors import blue, cyan, get_color, magenta, reverse, white, yellow
from tg.models import Model, MsgModel, UserModel
from tg.colors import (
blue,
bold,
cyan,
get_color,
magenta,
reverse,
white,
yellow,
)
from tg.models import Model
from tg.msg import MsgProxy
from tg.tdlib import ChatType
from tg.utils import emoji_pattern, num, truncate_to_len
log = logging.getLogger(__name__)
@ -161,19 +171,21 @@ class ChatView:
return tuple(attr | reverse for attr in attrs)
return attrs
def draw(self, current: int, chats: List[Dict[str, Any]]) -> None:
def draw(
self, current: int, chats: List[Dict[str, Any]], title: str = "Chats"
) -> None:
self.win.erase()
line = curses.ACS_VLINE # type: ignore
self.win.vline(0, self.w - 1, line, self.h)
for i, chat in enumerate(chats):
is_selected = i == current
unread_count = chat["unread_count"]
if chat["is_marked_as_unread"]:
unread_count = "unread"
width = self.w - 1
self.win.vline(0, width, line, self.h)
self.win.addstr(
0, 0, title.center(width)[:width], get_color(cyan, -1) | bold
)
for i, chat in enumerate(chats, 1):
is_selected = i == current + 1
date = get_date(chat)
title = chat["title"]
is_pinned = chat["is_pinned"]
last_msg = get_last_msg(chat)
offset = 0
for attr, elem in zip(
@ -182,7 +194,7 @@ class ChatView:
self.win.addstr(
i,
offset,
truncate_to_len(elem, max(0, self.w - offset - 1)),
truncate_to_len(elem, max(0, width - offset)),
attr,
)
offset += len(elem) + sum(
@ -196,22 +208,20 @@ class ChatView:
i, offset, last_msg, self._msg_color(is_selected)
)
if flags := self._get_flags(unread_count, is_pinned, chat):
if flags := self._get_flags(chat):
flags_len = len(flags) + sum(
map(len, emoji_pattern.findall(flags))
)
self.win.addstr(
i,
self.w - flags_len - 1,
flags,
max(0, width - flags_len),
flags[-width:],
self._unread_color(is_selected),
)
self._refresh()
def _get_flags(
self, unread_count: int, is_pinned: bool, chat: Dict[str, Any]
) -> str:
def _get_flags(self, chat: Dict[str, Any]) -> str:
flags = []
msg = chat.get("last_message")
@ -224,17 +234,22 @@ class ChatView:
# last msg haven't been seen by recipient
flags.append("unseen")
if action := self.model.users.get_action(chat["id"]):
flags.append(action)
if self.model.users.is_online(chat["id"]):
flags.append("online")
if is_pinned:
if chat["is_pinned"]:
flags.append("pinned")
if chat["notification_settings"]["mute_for"]:
flags.append("muted")
if unread_count:
flags.append(str(unread_count))
if chat["is_marked_as_unread"]:
flags.append("unread")
elif chat["unread_count"]:
flags.append(str(chat["unread_count"]))
label = " ".join(config.CHAT_FLAGS.get(flag, flag) for flag in flags)
if label:
@ -425,6 +440,7 @@ class MsgView:
current_msg_idx: int,
msgs: List[Tuple[int, Dict[str, Any]]],
min_msg_padding: int,
chat: Dict[str, Any],
) -> None:
self.win.erase()
msgs_to_draw = self._collect_msgs_to_draw(
@ -459,8 +475,49 @@ class MsgView:
self.win.addstr(line_num, column, elem, attr)
column += len(elem)
self.win.addstr(
0, 0, self._msg_title(chat), get_color(cyan, -1) | bold
)
self._refresh()
def _get_chat_type(self, chat: Dict[str, Any]) -> Optional[ChatType]:
try:
chat_type = ChatType[chat["type"]["@type"]]
if (
chat_type == ChatType.chatTypeSupergroup
and chat["type"]["is_channel"]
):
chat_type = ChatType.channel
return chat_type
except KeyError:
log.error(f"ChatType {chat['type']} not implemented")
return None
def _msg_title(self, chat: Dict[str, Any]):
chat_type = self._get_chat_type(chat)
status = ""
if action := self.model.users.get_action(chat["id"]):
status = action
elif chat_type == ChatType.chatTypePrivate:
status = self.model.users.get_status(chat["id"])
elif chat_type == ChatType.chatTypeBasicGroup:
if group := self.model.users.get_group_info(
chat["type"]["basic_group_id"]
):
status = f"{group['member_count']} members"
elif chat_type == ChatType.chatTypeSupergroup:
if supergroup := self.model.users.get_supergroup_info(
chat["type"]["supergroup_id"]
):
status = f"{supergroup['member_count']} members"
elif chat_type == ChatType.channel:
if supergroup := self.model.users.get_supergroup_info(
chat["type"]["supergroup_id"]
):
status = f"{supergroup['member_count']} subscribers"
return f"{chat['title']}: {status}".center(self.w)[: self.w]
def _msg_attributes(self, is_selected: bool) -> Tuple[int, ...]:
attrs = (
get_color(cyan, -1),