mirror of
https://github.com/paul-nameless/tg
synced 2024-11-22 03:43:19 +00:00
Show last msg sender user name in group chat preview (#104)
* wip * preview last msgs sender for group chats * user name for actions * refactor * another refactor * remove :
This commit is contained in:
parent
4460c5f94b
commit
5b660f47eb
3 changed files with 93 additions and 65 deletions
11
tg/models.py
11
tg/models.py
|
@ -472,16 +472,19 @@ class UserModel:
|
|||
self.me = result.update
|
||||
return self.me
|
||||
|
||||
def get_action(self, chat_id: int) -> Optional[str]:
|
||||
def get_user_action(
|
||||
self, chat_id: int
|
||||
) -> Tuple[Optional[int], Optional[str]]:
|
||||
action = self.actions.get(chat_id)
|
||||
if action is None:
|
||||
return None
|
||||
return None, None
|
||||
action_type = action["action"]["@type"]
|
||||
user_id = action["user_id"]
|
||||
try:
|
||||
return ChatAction[action_type].value + "..."
|
||||
return user_id, ChatAction[action_type].value
|
||||
except KeyError:
|
||||
log.error(f"ChatAction type {action_type} not implemented")
|
||||
return None
|
||||
return None, None
|
||||
|
||||
def set_status(self, user_id: int, status: Dict[str, Any]) -> None:
|
||||
if user_id not in self.users:
|
||||
|
|
19
tg/tdlib.py
19
tg/tdlib.py
|
@ -1,5 +1,5 @@
|
|||
from enum import Enum
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from telegram.client import AsyncResult, Telegram
|
||||
|
||||
|
@ -27,6 +27,9 @@ class ChatType(Enum):
|
|||
channel = "channel"
|
||||
chatTypeSecret = "secret"
|
||||
|
||||
def is_group(self, chat_type: "Union[str, ChatType]") -> bool:
|
||||
return chat_type in (self.chatTypeSupergroup, self.chatTypeBasicGroup)
|
||||
|
||||
|
||||
class UserStatus(Enum):
|
||||
userStatusEmpty = ""
|
||||
|
@ -253,3 +256,17 @@ class Tdlib(Telegram):
|
|||
"action": {"@type": action.name, "progress": progress},
|
||||
}
|
||||
return self._send_data(data)
|
||||
|
||||
|
||||
def get_chat_type(chat: Dict[str, Any]) -> Optional[ChatType]:
|
||||
try:
|
||||
chat_type = ChatType[chat["type"]["@type"]]
|
||||
if (
|
||||
chat_type == ChatType.chatTypeSupergroup
|
||||
and chat["type"]["is_channel"]
|
||||
):
|
||||
chat_type = ChatType.channel
|
||||
return chat_type
|
||||
except KeyError:
|
||||
pass
|
||||
return None
|
||||
|
|
128
tg/views.py
128
tg/views.py
|
@ -15,9 +15,9 @@ from tg.colors import (
|
|||
white,
|
||||
yellow,
|
||||
)
|
||||
from tg.models import Model
|
||||
from tg.models import Model, UserModel
|
||||
from tg.msg import MsgProxy
|
||||
from tg.tdlib import ChatType
|
||||
from tg.tdlib import ChatType, get_chat_type
|
||||
from tg.utils import emoji_pattern, get_color_by_str, num, truncate_to_len
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -161,12 +161,12 @@ class ChatView:
|
|||
return color
|
||||
|
||||
def _chat_attributes(
|
||||
self, is_selected: bool, title: str
|
||||
self, is_selected: bool, title: str, user: Optional[str]
|
||||
) -> Tuple[int, ...]:
|
||||
attrs = (
|
||||
get_color(cyan, -1),
|
||||
get_color(get_color_by_str(title), -1),
|
||||
get_color(blue, -1),
|
||||
get_color(get_color_by_str(user or ""), -1),
|
||||
self._msg_color(is_selected),
|
||||
)
|
||||
if is_selected:
|
||||
|
@ -188,27 +188,23 @@ class ChatView:
|
|||
is_selected = i == current + 1
|
||||
date = get_date(chat)
|
||||
title = chat["title"]
|
||||
last_msg = get_last_msg(chat)
|
||||
offset = 0
|
||||
for attr, elem in zip(
|
||||
self._chat_attributes(is_selected, title), [f"{date} ", title]
|
||||
):
|
||||
self.win.addstr(
|
||||
i,
|
||||
offset,
|
||||
truncate_to_len(elem, max(0, width - offset)),
|
||||
attr,
|
||||
)
|
||||
offset += len(elem) + sum(
|
||||
map(len, emoji_pattern.findall(elem))
|
||||
)
|
||||
|
||||
last_msg = " " + last_msg.replace("\n", " ")
|
||||
last_msg = truncate_to_len(last_msg, max(0, self.w - offset))
|
||||
if last_msg.strip():
|
||||
self.win.addstr(
|
||||
i, offset, last_msg, self._msg_color(is_selected)
|
||||
)
|
||||
last_msg_sender, last_msg = self._get_last_msg_data(chat)
|
||||
sender_label = f" {last_msg_sender}" if last_msg_sender else ""
|
||||
|
||||
for attr, elem in zip(
|
||||
self._chat_attributes(is_selected, title, last_msg_sender),
|
||||
[f"{date} ", title, sender_label, f" {last_msg}"],
|
||||
):
|
||||
if not elem:
|
||||
continue
|
||||
item = truncate_to_len(elem, max(0, width - offset))
|
||||
if len(item) > 1:
|
||||
self.win.addstr(i, offset, item, attr)
|
||||
offset += len(elem) + sum(
|
||||
map(len, emoji_pattern.findall(elem))
|
||||
)
|
||||
|
||||
if flags := self._get_flags(chat):
|
||||
flags_len = len(flags) + sum(
|
||||
|
@ -223,6 +219,19 @@ class ChatView:
|
|||
|
||||
self._refresh()
|
||||
|
||||
def _get_last_msg_data(
|
||||
self, chat: Dict[str, Any]
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
user, last_msg = get_last_msg(chat)
|
||||
last_msg = last_msg.replace("\n", " ")
|
||||
if user:
|
||||
last_msg_sender = _get_user_label(self.model.users, user)
|
||||
chat_type = get_chat_type(chat)
|
||||
if chat_type and chat_type.is_group(chat_type):
|
||||
return last_msg_sender, last_msg
|
||||
|
||||
return None, last_msg
|
||||
|
||||
def _get_flags(self, chat: Dict[str, Any]) -> str:
|
||||
flags = []
|
||||
|
||||
|
@ -236,8 +245,8 @@ class ChatView:
|
|||
# last msg haven't been seen by recipient
|
||||
flags.append("unseen")
|
||||
|
||||
if action := self.model.users.get_action(chat["id"]):
|
||||
flags.append(action)
|
||||
if action_label := _get_action_label(self.model.users, chat["id"]):
|
||||
flags.append(action_label)
|
||||
|
||||
if self.model.users.is_online(chat["id"]):
|
||||
flags.append("online")
|
||||
|
@ -321,7 +330,9 @@ class MsgView:
|
|||
reply_msg = MsgProxy(_msg)
|
||||
if reply_msg_content := self._parse_msg(reply_msg):
|
||||
reply_msg_content = reply_msg_content.replace("\n", " ")
|
||||
reply_sender = self._get_user_by_id(reply_msg.sender_id)
|
||||
reply_sender = _get_user_label(
|
||||
self.model.users, reply_msg.sender_id
|
||||
)
|
||||
sender_name = f" {reply_sender}:" if reply_sender else ""
|
||||
reply_line = f">{sender_name} {reply_msg_content}"
|
||||
if len(reply_line) >= width_limit:
|
||||
|
@ -382,7 +393,7 @@ class MsgView:
|
|||
dt = msg_proxy.date.strftime("%H:%M:%S")
|
||||
user_id_item = msg_proxy.sender_id
|
||||
|
||||
user_id = self._get_user_by_id(user_id_item)
|
||||
user_id = _get_user_label(self.model.users, user_id_item)
|
||||
flags = self._get_flags(msg_proxy)
|
||||
if user_id and flags:
|
||||
# if not channel add space between name and flags
|
||||
|
@ -485,24 +496,12 @@ class MsgView:
|
|||
)
|
||||
self._refresh()
|
||||
|
||||
def _get_chat_type(self, chat: Dict[str, Any]) -> Optional[ChatType]:
|
||||
try:
|
||||
chat_type = ChatType[chat["type"]["@type"]]
|
||||
if (
|
||||
chat_type == ChatType.chatTypeSupergroup
|
||||
and chat["type"]["is_channel"]
|
||||
):
|
||||
chat_type = ChatType.channel
|
||||
return chat_type
|
||||
except KeyError:
|
||||
log.error(f"ChatType {chat['type']} not implemented")
|
||||
return None
|
||||
|
||||
def _msg_title(self, chat: Dict[str, Any]) -> str:
|
||||
chat_type = self._get_chat_type(chat)
|
||||
chat_type = get_chat_type(chat)
|
||||
status = ""
|
||||
if action := self.model.users.get_action(chat["id"]):
|
||||
status = action
|
||||
|
||||
if action_label := _get_action_label(self.model.users, chat["id"]):
|
||||
status = action_label
|
||||
elif chat_type == ChatType.chatTypePrivate:
|
||||
status = self.model.users.get_status(chat["id"])
|
||||
elif chat_type == ChatType.chatTypeBasicGroup:
|
||||
|
@ -535,20 +534,6 @@ class MsgView:
|
|||
return tuple(attr | reverse for attr in attrs)
|
||||
return attrs
|
||||
|
||||
def _get_user_by_id(self, user_id: int) -> str:
|
||||
if user_id == 0:
|
||||
return ""
|
||||
user = self.model.users.get_user(user_id)
|
||||
if user["first_name"] and user["last_name"]:
|
||||
return f'{user["first_name"]} {user["last_name"]}'[:20]
|
||||
|
||||
if user["first_name"]:
|
||||
return f'{user["first_name"]}'[:20]
|
||||
|
||||
if user.get("username"):
|
||||
return "@" + user["username"]
|
||||
return "Unknown?"
|
||||
|
||||
def _parse_msg(self, msg: MsgProxy) -> str:
|
||||
if msg.is_message:
|
||||
return parse_content(msg["content"])
|
||||
|
@ -556,12 +541,12 @@ class MsgView:
|
|||
return "unknown msg type: " + str(msg["content"])
|
||||
|
||||
|
||||
def get_last_msg(chat: Dict[str, Any]) -> str:
|
||||
def get_last_msg(chat: Dict[str, Any]) -> Tuple[Optional[int], str]:
|
||||
last_msg = chat.get("last_message")
|
||||
if not last_msg:
|
||||
return "<No messages yet>"
|
||||
return None, "<No messages yet>"
|
||||
content = last_msg["content"]
|
||||
return parse_content(content)
|
||||
return last_msg["sender_user_id"], parse_content(content)
|
||||
|
||||
|
||||
def get_date(chat: Dict[str, Any]) -> str:
|
||||
|
@ -613,3 +598,26 @@ def get_download(local: Dict[str, Union[str, bool, int]], size: int) -> str:
|
|||
percent = int(d * 100 / size)
|
||||
return f"{percent}%"
|
||||
return "no"
|
||||
|
||||
|
||||
def _get_user_label(users: UserModel, user_id: int) -> str:
|
||||
if user_id == 0:
|
||||
return ""
|
||||
user = users.get_user(user_id)
|
||||
if user["first_name"] and user["last_name"]:
|
||||
return f'{user["first_name"]} {user["last_name"]}'[:20]
|
||||
|
||||
if user["first_name"]:
|
||||
return f'{user["first_name"]}'[:20]
|
||||
|
||||
if user.get("username"):
|
||||
return "@" + user["username"]
|
||||
return "Unknown?"
|
||||
|
||||
|
||||
def _get_action_label(users: UserModel, chat_id: int) -> Optional[str]:
|
||||
actioner, action = users.get_user_action(chat_id)
|
||||
if actioner and action:
|
||||
user_label = _get_user_label(users, actioner)
|
||||
return f"{user_label} is {action}..."
|
||||
return None
|
||||
|
|
Loading…
Reference in a new issue