Show list of contacts (#119)

* Get contacts

* Count min user name column length, sort by user name

* Fix black formatting

* Fix black formatting

* Order users in contacts by last seen status
This commit is contained in:
Nameless 2020-07-08 18:40:44 +08:00 committed by GitHub
parent f168fcf01d
commit 532fa0154a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 15 deletions

View file

@ -33,7 +33,7 @@ MAX_DOWNLOAD_SIZE = "10MB"
# TODO: check platform
NOTIFY_CMD = "/usr/local/bin/terminal-notifier -title {title} -subtitle {subtitle} -message {msg} -appIcon {icon_path}"
HELP_CMD = "less"
VIEW_TEXT_CMD = "less"
if _os_name == _linux:
# for more info see https://trac.ffmpeg.org/wiki/Capture/ALSA

View file

@ -12,7 +12,7 @@ from telegram.utils import AsyncResult
from tg import config
from tg.models import Model
from tg.msg import MsgProxy
from tg.tdlib import ChatAction, Tdlib
from tg.tdlib import ChatAction, Tdlib, UserStatus
from tg.utils import (
get_duration,
get_video_resolution,
@ -21,7 +21,7 @@ from tg.utils import (
notify,
suspend,
)
from tg.views import View
from tg.views import View, get_user_label
log = logging.getLogger(__name__)
@ -109,13 +109,13 @@ class Controller:
def show_chat_help(self) -> None:
_help = self.format_help(chat_handler)
with suspend(self.view) as s:
s.run_with_input(config.HELP_CMD, _help)
s.run_with_input(config.VIEW_TEXT_CMD, _help)
@bind(msg_handler, ["?"])
def show_msg_help(self) -> None:
_help = self.format_help(msg_handler)
with suspend(self.view) as s:
s.run_with_input(config.HELP_CMD, _help)
s.run_with_input(config.VIEW_TEXT_CMD, _help)
@bind(chat_handler, ["bp"])
@bind(msg_handler, ["bp"])
@ -453,6 +453,33 @@ class Controller:
self.model.edit_message(text=text)
self.present_info("Message edited")
@bind(chat_handler, ["c"])
def view_contacts(self) -> None:
contacts = self.model.users.get_contacts()
if contacts is None:
return self.present_error("Can't get contacts")
total = contacts["total_count"]
users = []
for user_id in contacts["user_ids"]:
user_name = get_user_label(self.model.users, user_id)
status = self.model.users.get_status(user_id)
order = self.model.users.get_user_status_order(user_id)
users.append((user_name, status, order))
_, cols = self.view.stdscr.getmaxyx()
limit = min(
int(cols / 2), max(len(user_name) for user_name, *_ in users)
)
users_out = "\n".join(
f"{user_name:<{limit}} | {status}"
for user_name, status, _ in sorted(users, key=lambda it: it[2])
)
with suspend(self.view) as s:
s.run_with_input(
config.VIEW_TEXT_CMD, f"{total} users:\n" + users_out
)
@bind(chat_handler, ["l", "^J", "^E"])
def handle_msgs(self) -> Optional[str]:
rc = self.handle(msg_handler, 0.2)

View file

@ -1,4 +1,5 @@
import logging
import sys
import time
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple
@ -27,9 +28,6 @@ class Model:
def is_me(self, user_id: int) -> bool:
return self.get_me()["id"] == user_id
def get_user(self, user_id: int) -> Dict:
return self.users.get_user(user_id)
@property
def current_chat_id(self) -> Optional[int]:
return self.chats.id_by_index(self.current_chat)
@ -464,6 +462,7 @@ class UserModel:
self.supergroups: Dict[int, Dict] = {}
self.actions: Dict[int, Dict] = {}
self.not_found: Set[int] = set()
self.contacts: Dict[str, Any] = {}
def get_me(self) -> Dict[str, Any]:
if self.me:
@ -519,6 +518,28 @@ class UserModel:
return f"last seen {ago}"
return f"last seen {status.value}"
def get_user_status_order(self, user_id: int) -> int:
if user_id not in self.users:
return sys.maxsize
user_status = self.users[user_id]["status"]
try:
status = UserStatus[user_status["@type"]]
except KeyError:
log.error(f"UserStatus type {user_status} not implemented")
return sys.maxsize
if status == UserStatus.userStatusOnline:
return 0
elif status == UserStatus.userStatusOffline:
was_online = user_status["was_online"]
return time.time() - was_online
order = {
UserStatus.userStatusRecently: 1,
UserStatus.userStatusLastWeek: 2,
UserStatus.userStatusLastMonth: 3,
}
return order.get(status, sys.maxsize)
def is_online(self, user_id: int) -> bool:
user = self.get_user(user_id)
if (
@ -557,3 +578,16 @@ class UserModel:
return self.supergroups[supergroup_id]
self.tg.get_supergroup(supergroup_id)
return None
def get_contacts(self) -> Optional[Dict[str, Any]]:
if self.contacts:
return self.contacts
result = self.tg.get_contacts()
result.wait()
if result.error:
log.error("get contacts error: %s", result.error_info)
return None
self.contacts = result.update
return self.contacts

View file

@ -257,6 +257,12 @@ class Tdlib(Telegram):
}
return self._send_data(data)
def get_contacts(self) -> AsyncResult:
data = {
"@type": "getContacts",
}
return self._send_data(data)
def get_chat_type(chat: Dict[str, Any]) -> Optional[ChatType]:
try:

View file

@ -252,7 +252,7 @@ def pretty_ts(ts: int) -> str:
if second_diff < 86400:
return f"{int(second_diff / 3600)} hours ago"
if day_diff == 1:
return "Yesterday"
return "yesterday"
if day_diff < 7:
return f"{day_diff} days ago"
if day_diff < 31:

View file

@ -224,7 +224,7 @@ class ChatView:
) -> Tuple[Optional[str], Optional[str]]:
user, last_msg = get_last_msg(chat)
if user:
last_msg_sender = _get_user_label(self.model.users, user)
last_msg_sender = get_user_label(self.model.users, user)
chat_type = get_chat_type(chat)
if chat_type and chat_type.is_group(chat_type):
return last_msg_sender, last_msg
@ -328,7 +328,7 @@ class MsgView:
return msg
reply_msg = MsgProxy(_msg)
if reply_msg_content := self._parse_msg(reply_msg):
reply_sender = _get_user_label(
reply_sender = get_user_label(
self.model.users, reply_msg.sender_id
)
sender_name = f" {reply_sender}:" if reply_sender else ""
@ -390,7 +390,7 @@ class MsgView:
dt = msg_proxy.date.strftime("%H:%M:%S")
user_id_item = msg_proxy.sender_id
user_id = _get_user_label(self.model.users, user_id_item)
user_id = get_user_label(self.model.users, user_id_item)
flags = self._get_flags(msg_proxy)
if user_id and flags:
# if not channel add space between name and flags
@ -617,7 +617,7 @@ def get_download(
return "no"
def _get_user_label(users: UserModel, user_id: int) -> str:
def get_user_label(users: UserModel, user_id: int) -> str:
if user_id == 0:
return ""
user = users.get_user(user_id)
@ -629,7 +629,7 @@ def _get_user_label(users: UserModel, user_id: int) -> str:
if user.get("username"):
return "@" + user["username"]
return "Unknown?"
return "<Unknown>"
def _get_action_label(users: UserModel, chat: Dict[str, Any]) -> Optional[str]:
@ -638,7 +638,7 @@ def _get_action_label(users: UserModel, chat: Dict[str, Any]) -> Optional[str]:
label = f"{action}..."
chat_type = get_chat_type(chat)
if chat_type and chat_type.is_group(chat_type):
user_label = _get_user_label(users, actioner)
user_label = get_user_label(users, actioner)
label = f"{user_label} {label}"
return label