mirror of
https://github.com/paul-nameless/tg
synced 2025-02-16 10:38:23 +00:00
Use enum instead of dict and refactor code
This commit is contained in:
parent
9d27f6c3d9
commit
ba38cd207c
5 changed files with 141 additions and 111 deletions
|
@ -11,7 +11,7 @@ from typing import Any, Callable, Dict, List, Optional
|
|||
from tg import config
|
||||
from tg.models import Model
|
||||
from tg.msg import MsgProxy
|
||||
from tg.tdlib import Action, Tdlib
|
||||
from tg.tdlib import ChatAction, Tdlib
|
||||
from tg.utils import (
|
||||
get_duration,
|
||||
get_video_resolution,
|
||||
|
@ -237,12 +237,12 @@ class Controller:
|
|||
self.present_info("Can't send msg in this chat")
|
||||
return
|
||||
chat_id = self.model.chats.id_by_index(self.model.current_chat)
|
||||
self.tg.send_chat_action(chat_id, Action.chatActionTyping)
|
||||
self.tg.send_chat_action(chat_id, ChatAction.chatActionTyping)
|
||||
if msg := self.view.status.get_input():
|
||||
self.model.send_message(text=msg)
|
||||
self.present_info("Message sent")
|
||||
else:
|
||||
self.tg.send_chat_action(chat_id, Action.chatActionCancel)
|
||||
self.tg.send_chat_action(chat_id, ChatAction.chatActionCancel)
|
||||
self.present_info("Message wasn't sent")
|
||||
|
||||
@bind(msg_handler, ["A", "I"])
|
||||
|
@ -254,14 +254,16 @@ class Controller:
|
|||
self.view
|
||||
) as s:
|
||||
chat_id = self.model.chats.id_by_index(self.model.current_chat)
|
||||
self.tg.send_chat_action(chat_id, Action.chatActionTyping)
|
||||
self.tg.send_chat_action(chat_id, ChatAction.chatActionTyping)
|
||||
s.call(config.LONG_MSG_CMD.format(file_path=shlex.quote(f.name)))
|
||||
with open(f.name) as f:
|
||||
if msg := f.read().strip():
|
||||
self.model.send_message(text=msg)
|
||||
self.present_info("Message sent")
|
||||
else:
|
||||
self.tg.send_chat_action(chat_id, Action.chatActionCancel)
|
||||
self.tg.send_chat_action(
|
||||
chat_id, ChatAction.chatActionCancel
|
||||
)
|
||||
self.present_info("Message wasn't sent")
|
||||
|
||||
@bind(msg_handler, ["sv"])
|
||||
|
|
79
tg/models.py
79
tg/models.py
|
@ -4,8 +4,8 @@ from collections import defaultdict
|
|||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from tg.msg import MsgProxy
|
||||
from tg.tdlib import Action, Tdlib
|
||||
from tg.utils import copy_to_clipboard
|
||||
from tg.tdlib import ChatAction, Tdlib, UserStatus
|
||||
from tg.utils import copy_to_clipboard, pretty_ts
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -126,8 +126,6 @@ class Model:
|
|||
limit = offset + page_size
|
||||
return self.chats.fetch_chats(offset=offset, limit=limit)
|
||||
|
||||
# def send_action(self, action: Action)
|
||||
|
||||
def send_message(self, text: str) -> bool:
|
||||
chat_id = self.chats.id_by_index(self.current_chat)
|
||||
if chat_id is None:
|
||||
|
@ -457,15 +455,6 @@ class MsgModel:
|
|||
|
||||
class UserModel:
|
||||
|
||||
statuses = {
|
||||
"userStatusEmpty": "",
|
||||
"userStatusOnline": "online",
|
||||
"userStatusOffline": "offline",
|
||||
"userStatusRecently": "recently",
|
||||
"userStatusLastWeek": "last week",
|
||||
"userStatusLastMonth": "last month",
|
||||
}
|
||||
|
||||
types = {
|
||||
"userTypeUnknown": "unknown",
|
||||
"userTypeBot": "bot",
|
||||
|
@ -499,9 +488,9 @@ class UserModel:
|
|||
return None
|
||||
action_type = action["action"]["@type"]
|
||||
try:
|
||||
return Action[action_type].value + "..."
|
||||
return ChatAction[action_type].value + "..."
|
||||
except KeyError:
|
||||
log.error(f"Action type {action_type} not implemented")
|
||||
log.error(f"ChatAction type {action_type} not implemented")
|
||||
return None
|
||||
|
||||
def set_status(self, user_id: int, status: Dict[str, Any]):
|
||||
|
@ -509,22 +498,29 @@ class UserModel:
|
|||
self.get_user(user_id)
|
||||
self.users[user_id]["status"] = status
|
||||
|
||||
def get_status(self, user_id: int):
|
||||
def get_status(self, user_id: int) -> str:
|
||||
if user_id not in self.users:
|
||||
return None
|
||||
return ""
|
||||
user_status = self.users[user_id]["status"]
|
||||
log.info(f"user_status:: {user_status}")
|
||||
status = self.statuses.get(user_status["@type"])
|
||||
if status == "online":
|
||||
|
||||
try:
|
||||
status = UserStatus[user_status["@type"]]
|
||||
except KeyError:
|
||||
log.error(f"UserStatus type {user_status} not implemented")
|
||||
return ""
|
||||
|
||||
if status == UserStatus.userStatusEmpty:
|
||||
return ""
|
||||
elif status == UserStatus.userStatusOnline:
|
||||
expires = user_status["expires"]
|
||||
if expires < time.time():
|
||||
return None
|
||||
return status
|
||||
elif status == "offline":
|
||||
return ""
|
||||
return status.value
|
||||
elif status == UserStatus.userStatusOffline:
|
||||
was_online = user_status["was_online"]
|
||||
ago = pretty_ts(was_online)
|
||||
return f"last seen {ago}"
|
||||
return f"last seen {status}"
|
||||
return f"last seen {status.value}"
|
||||
|
||||
def is_online(self, user_id: int):
|
||||
user = self.get_user(user_id)
|
||||
|
@ -560,38 +556,3 @@ class UserModel:
|
|||
if supergroup_id in self.supergroups:
|
||||
return self.supergroups[supergroup_id]
|
||||
self.tg.get_supergroup(supergroup_id)
|
||||
|
||||
|
||||
def pretty_ts(ts):
|
||||
from datetime import datetime
|
||||
|
||||
now = datetime.now()
|
||||
diff = now - datetime.fromtimestamp(ts)
|
||||
second_diff = diff.seconds
|
||||
day_diff = diff.days
|
||||
|
||||
if day_diff < 0:
|
||||
return ""
|
||||
|
||||
if day_diff == 0:
|
||||
if second_diff < 10:
|
||||
return "just now"
|
||||
if second_diff < 60:
|
||||
return f"{second_diff} seconds ago"
|
||||
if second_diff < 120:
|
||||
return "a minute ago"
|
||||
if second_diff < 3600:
|
||||
return f"{int(second_diff / 60)} minutes ago"
|
||||
if second_diff < 7200:
|
||||
return "an hour ago"
|
||||
if second_diff < 86400:
|
||||
return f"{int(second_diff / 3600)} hours ago"
|
||||
if day_diff == 1:
|
||||
return "Yesterday"
|
||||
if day_diff < 7:
|
||||
return f"{day_diff} days ago"
|
||||
if day_diff < 31:
|
||||
return f"{int(day_diff / 7)} weeks ago"
|
||||
if day_diff < 365:
|
||||
return f"{int(day_diff / 30)} months ago"
|
||||
return f"{int(day_diff / 365)} years ago"
|
||||
|
|
29
tg/tdlib.py
29
tg/tdlib.py
|
@ -4,20 +4,37 @@ from typing import Any, Dict, List
|
|||
from telegram.client import AsyncResult, Telegram
|
||||
|
||||
|
||||
class Action(Enum):
|
||||
class ChatAction(Enum):
|
||||
chatActionTyping = "typing"
|
||||
chatActionCancel = "cancel"
|
||||
chatActionRecordingVideo = "recording video"
|
||||
chatActionUploadingVideo = "uploading video"
|
||||
chatActionRecordingVoiceNote = "recording voice note"
|
||||
chatActionUploadingVoiceNote = "uploading voice note"
|
||||
chatActionRecordingVoiceNote = "recording voice"
|
||||
chatActionUploadingVoiceNote = "uploading voice"
|
||||
chatActionUploadingPhoto = "uploading photo"
|
||||
chatActionUploadingDocument = "uploading document"
|
||||
chatActionChoosingLocation = "choosing location"
|
||||
chatActionChoosingContact = "choosing contact"
|
||||
chatActionStartPlayingGame = "start playing game"
|
||||
chatActionRecordingVideoNote = "recording video note"
|
||||
chatActionUploadingVideoNote = "uploading video note"
|
||||
chatActionRecordingVideoNote = "recording video"
|
||||
chatActionUploadingVideoNote = "uploading video"
|
||||
|
||||
|
||||
class ChatType(Enum):
|
||||
chatTypePrivate = "private"
|
||||
chatTypeBasicGroup = "group"
|
||||
chatTypeSupergroup = "supergroup"
|
||||
channel = "channel"
|
||||
chatTypeSecret = "secret"
|
||||
|
||||
|
||||
class UserStatus(Enum):
|
||||
userStatusEmpty = ""
|
||||
userStatusOnline = "online"
|
||||
userStatusOffline = "offline"
|
||||
userStatusRecently = "recently"
|
||||
userStatusLastWeek = "last week"
|
||||
userStatusLastMonth = "last month"
|
||||
|
||||
|
||||
class Tdlib(Telegram):
|
||||
|
@ -221,7 +238,7 @@ class Tdlib(Telegram):
|
|||
return self._send_data(data)
|
||||
|
||||
def send_chat_action(
|
||||
self, chat_id: int, action: Action, progress: int = None
|
||||
self, chat_id: int, action: ChatAction, progress: int = None
|
||||
) -> AsyncResult:
|
||||
data = {
|
||||
"@type": "sendChatAction",
|
||||
|
|
34
tg/utils.py
34
tg/utils.py
|
@ -216,3 +216,37 @@ class suspend:
|
|||
|
||||
def set_shorter_esc_delay(delay=25):
|
||||
os.environ.setdefault("ESCDELAY", str(delay))
|
||||
|
||||
|
||||
def pretty_ts(ts):
|
||||
now = datetime.utcnow()
|
||||
diff = now - datetime.utcfromtimestamp(ts)
|
||||
second_diff = diff.seconds
|
||||
day_diff = diff.days
|
||||
log.info("diff:: %s, %s, %s", ts, second_diff, day_diff)
|
||||
|
||||
if day_diff < 0:
|
||||
return ""
|
||||
|
||||
if day_diff == 0:
|
||||
if second_diff < 10:
|
||||
return "just now"
|
||||
if second_diff < 60:
|
||||
return f"{second_diff} seconds ago"
|
||||
if second_diff < 120:
|
||||
return "a minute ago"
|
||||
if second_diff < 3600:
|
||||
return f"{int(second_diff / 60)} minutes ago"
|
||||
if second_diff < 7200:
|
||||
return "an hour ago"
|
||||
if second_diff < 86400:
|
||||
return f"{int(second_diff / 3600)} hours ago"
|
||||
if day_diff == 1:
|
||||
return "Yesterday"
|
||||
if day_diff < 7:
|
||||
return f"{day_diff} days ago"
|
||||
if day_diff < 31:
|
||||
return f"{int(day_diff / 7)} weeks ago"
|
||||
if day_diff < 365:
|
||||
return f"{int(day_diff / 30)} months ago"
|
||||
return f"{int(day_diff / 365)} years ago"
|
||||
|
|
98
tg/views.py
98
tg/views.py
|
@ -5,9 +5,19 @@ from datetime import datetime
|
|||
from typing import Any, Dict, List, Optional, Tuple, cast
|
||||
|
||||
from tg import config
|
||||
from tg.colors import blue, cyan, get_color, magenta, reverse, white, yellow
|
||||
from tg.models import Model, MsgModel, UserModel
|
||||
from tg.colors import (
|
||||
blue,
|
||||
bold,
|
||||
cyan,
|
||||
get_color,
|
||||
magenta,
|
||||
reverse,
|
||||
white,
|
||||
yellow,
|
||||
)
|
||||
from tg.models import Model
|
||||
from tg.msg import MsgProxy
|
||||
from tg.tdlib import ChatType
|
||||
from tg.utils import emoji_pattern, num, truncate_to_len
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -166,9 +176,11 @@ class ChatView:
|
|||
) -> None:
|
||||
self.win.erase()
|
||||
line = curses.ACS_VLINE # type: ignore
|
||||
self.win.vline(0, self.w - 1, line, self.h)
|
||||
|
||||
self.win.addstr(0, 0, title.center(self.w - 1), get_color(cyan, -1))
|
||||
width = self.w - 1
|
||||
self.win.vline(0, width, line, self.h)
|
||||
self.win.addstr(
|
||||
0, 0, title.center(width)[:width], get_color(cyan, -1) | bold
|
||||
)
|
||||
|
||||
for i, chat in enumerate(chats, 1):
|
||||
is_selected = i == current + 1
|
||||
|
@ -182,7 +194,7 @@ class ChatView:
|
|||
self.win.addstr(
|
||||
i,
|
||||
offset,
|
||||
truncate_to_len(elem, max(0, self.w - offset - 1)),
|
||||
truncate_to_len(elem, max(0, width - offset)),
|
||||
attr,
|
||||
)
|
||||
offset += len(elem) + sum(
|
||||
|
@ -202,8 +214,8 @@ class ChatView:
|
|||
)
|
||||
self.win.addstr(
|
||||
i,
|
||||
self.w - flags_len - 1,
|
||||
flags,
|
||||
max(0, width - flags_len),
|
||||
flags[-width:],
|
||||
self._unread_color(is_selected),
|
||||
)
|
||||
|
||||
|
@ -463,44 +475,48 @@ class MsgView:
|
|||
self.win.addstr(line_num, column, elem, attr)
|
||||
column += len(elem)
|
||||
|
||||
self.win.addstr(0, 0, self._msg_title(chat), get_color(cyan, -1))
|
||||
self.win.addstr(
|
||||
0, 0, self._msg_title(chat), get_color(cyan, -1) | bold
|
||||
)
|
||||
self._refresh()
|
||||
|
||||
def _get_chat_type(self, chat: Dict[str, Any]) -> Optional[ChatType]:
|
||||
try:
|
||||
chat_type = ChatType[chat["type"]["@type"]]
|
||||
if (
|
||||
chat_type == ChatType.chatTypeSupergroup
|
||||
and chat["type"]["is_channel"]
|
||||
):
|
||||
chat_type = ChatType.channel
|
||||
return chat_type
|
||||
except KeyError:
|
||||
log.error(f"ChatType {chat['type']} not implemented")
|
||||
return None
|
||||
|
||||
def _msg_title(self, chat: Dict[str, Any]):
|
||||
chat_type = chat["type"]["@type"]
|
||||
info = ""
|
||||
_type = "unknown"
|
||||
if chat_type == "chatTypePrivate":
|
||||
_type = "private"
|
||||
info = self.model.users.get_status(chat["id"]) or ""
|
||||
elif chat_type == "chatTypeBasicGroup":
|
||||
_type = "group"
|
||||
group = self.model.users.get_group_info(
|
||||
chat["type"]["basic_group_id"]
|
||||
)
|
||||
log.info(f"group:: {group}")
|
||||
if group:
|
||||
info = f"{group['member_count']} members"
|
||||
elif chat_type == "chatTypeSupergroup":
|
||||
if chat["type"]["is_channel"]:
|
||||
_type = "channel"
|
||||
else:
|
||||
_type = "supergroup"
|
||||
supergroup = self.model.users.get_supergroup_info(
|
||||
chat["type"]["supergroup_id"]
|
||||
)
|
||||
log.info(f"supergroup:: {supergroup}")
|
||||
if supergroup:
|
||||
info = f"{supergroup['member_count']} members"
|
||||
|
||||
elif chat_type == "chatTypeSecret":
|
||||
_type = "secret"
|
||||
|
||||
# return f" {chat['title']} [{_type}] {info}".center(self.w)
|
||||
chat_type = self._get_chat_type(chat)
|
||||
status = ""
|
||||
if action := self.model.users.get_action(chat["id"]):
|
||||
info = action
|
||||
status = action
|
||||
elif chat_type == ChatType.chatTypePrivate:
|
||||
status = self.model.users.get_status(chat["id"])
|
||||
elif chat_type == ChatType.chatTypeBasicGroup:
|
||||
if group := self.model.users.get_group_info(
|
||||
chat["type"]["basic_group_id"]
|
||||
):
|
||||
status = f"{group['member_count']} members"
|
||||
elif chat_type == ChatType.chatTypeSupergroup:
|
||||
if supergroup := self.model.users.get_supergroup_info(
|
||||
chat["type"]["supergroup_id"]
|
||||
):
|
||||
status = f"{supergroup['member_count']} members"
|
||||
elif chat_type == ChatType.channel:
|
||||
if supergroup := self.model.users.get_supergroup_info(
|
||||
chat["type"]["supergroup_id"]
|
||||
):
|
||||
status = f"{supergroup['member_count']} subscribers"
|
||||
|
||||
return f" {chat['title']}: {info}".center(self.w)
|
||||
return f"{chat['title']}: {status}".center(self.w)[: self.w]
|
||||
|
||||
def _msg_attributes(self, is_selected: bool) -> Tuple[int, ...]:
|
||||
attrs = (
|
||||
|
|
Loading…
Add table
Reference in a new issue