mirror of
https://github.com/paul-nameless/tg
synced 2024-11-26 05:40:17 +00:00
Merge pull request #30 from paul-nameless/load-older-msg-in-the-chat
Add msg scrolling
This commit is contained in:
commit
f7b25d2a67
3 changed files with 113 additions and 44 deletions
|
@ -23,6 +23,13 @@ from tg.views import View
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# start scrolling to next page when number of the msgs left is less than value.
|
||||
# note, that setting high values could lead to situations when long msgs will
|
||||
# be removed from the display in order to achive scroll threshold. this could
|
||||
# cause blan areas on the msg display screen
|
||||
MSGS_LEFT_SCROLL_THRESHOLD = 2
|
||||
|
||||
|
||||
class Controller:
|
||||
"""
|
||||
# MVC
|
||||
|
@ -76,7 +83,7 @@ class Controller:
|
|||
if file_id:
|
||||
self.download(file_id, msg["chat_id"], msg["id"])
|
||||
|
||||
def download(self, file_id: int, chat_id, msg_id):
|
||||
def download(self, file_id: int, chat_id: int, msg_id: int):
|
||||
log.info("Downloading file: file_id=%s", file_id)
|
||||
self.model.downloads[file_id] = (chat_id, msg_id)
|
||||
self.tg.download_file(file_id=file_id)
|
||||
|
@ -246,8 +253,15 @@ class Controller:
|
|||
|
||||
def refresh_msgs(self) -> None:
|
||||
self.view.msgs.users = self.model.users
|
||||
msgs = self.model.fetch_msgs(limit=self.view.msgs.h)
|
||||
self.view.msgs.draw(self.model.get_current_chat_msg(), msgs)
|
||||
current_msg_idx = self.model.get_current_chat_msg_idx()
|
||||
if current_msg_idx is None:
|
||||
return
|
||||
msgs = self.model.fetch_msgs(
|
||||
current_position=current_msg_idx,
|
||||
page_size=self.view.msgs.h,
|
||||
msgs_left_scroll_threshold=MSGS_LEFT_SCROLL_THRESHOLD,
|
||||
)
|
||||
self.view.msgs.draw(current_msg_idx, msgs, MSGS_LEFT_SCROLL_THRESHOLD)
|
||||
|
||||
@handle_exception
|
||||
def update_new_msg(self, update):
|
||||
|
@ -265,7 +279,6 @@ class Controller:
|
|||
chat = None
|
||||
for chat in self.model.chats.chats:
|
||||
if chat_id == chat["id"]:
|
||||
chat = chat
|
||||
break
|
||||
|
||||
if (
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
from typing import Any, Dict, List, Optional, Set, Union, Tuple
|
||||
|
||||
from telegram.client import Telegram
|
||||
|
||||
|
@ -15,7 +15,7 @@ class Model:
|
|||
self.msgs = MsgModel(tg)
|
||||
self.users = UserModel(tg)
|
||||
self.current_chat = 0
|
||||
self.downloads = {}
|
||||
self.downloads: Dict[int, Tuple[int, int]] = {}
|
||||
|
||||
def get_me(self):
|
||||
return self.users.get_me()
|
||||
|
@ -23,16 +23,25 @@ class Model:
|
|||
def get_user(self, user_id):
|
||||
return self.users.get_user(user_id)
|
||||
|
||||
def get_current_chat_msg(self) -> Optional[int]:
|
||||
def get_current_chat_msg_idx(self) -> Optional[int]:
|
||||
chat_id = self.chats.id_by_index(self.current_chat)
|
||||
if chat_id is None:
|
||||
return None
|
||||
return self.msgs.current_msgs[chat_id]
|
||||
|
||||
def fetch_msgs(self, offset: int = 0, limit: int = 10) -> Any:
|
||||
def fetch_msgs(
|
||||
self,
|
||||
current_position: int = 0,
|
||||
page_size: int = 10,
|
||||
msgs_left_scroll_threshold: int = 10,
|
||||
) -> List[Tuple[int, Dict[str, Any]]]:
|
||||
chat_id = self.chats.id_by_index(self.current_chat)
|
||||
if chat_id is None:
|
||||
return []
|
||||
msgs_left = page_size - 1 - current_position
|
||||
offset = max(msgs_left_scroll_threshold - msgs_left, 0)
|
||||
|
||||
limit = offset + page_size
|
||||
return self.msgs.fetch_msgs(chat_id, offset=offset, limit=limit)
|
||||
|
||||
def current_msg(self):
|
||||
|
@ -205,6 +214,7 @@ class MsgModel:
|
|||
if new_idx < len(self.msgs[chat_id]):
|
||||
self.current_msgs[chat_id] = new_idx
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def remove_message(self, chat_id, msg_id):
|
||||
|
@ -277,12 +287,18 @@ class MsgModel:
|
|||
|
||||
def fetch_msgs(
|
||||
self, chat_id: int, offset: int = 0, limit: int = 10
|
||||
) -> Any:
|
||||
) -> List[Tuple[int, Dict[str, Any]]]:
|
||||
if offset + limit > len(self.msgs[chat_id]):
|
||||
messages = self._fetch_msgs_until_limit(chat_id, offset, limit)
|
||||
messages = self._fetch_msgs_until_limit(
|
||||
chat_id, offset, offset + limit
|
||||
)
|
||||
self.add_messages(chat_id, messages)
|
||||
|
||||
return self.msgs[chat_id][offset:limit]
|
||||
return [
|
||||
(i, self.msgs[chat_id][i])
|
||||
for i in range(offset, offset + limit)
|
||||
if i < len(self.msgs[chat_id])
|
||||
]
|
||||
|
||||
def send_message(self, chat_id: int, text: str) -> None:
|
||||
log.info("Sending msg")
|
||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
|||
import re
|
||||
from _curses import window
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple, Iterator
|
||||
|
||||
from tg.colors import blue, cyan, get_color, magenta, reverse, white
|
||||
from tg.msg import MsgProxy
|
||||
|
@ -12,14 +12,7 @@ from tg.utils import num
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
MAX_KEYBINDING_LENGTH = 5
|
||||
MULTICHAR_KEYBINDINGS = (
|
||||
"gg",
|
||||
"dd",
|
||||
"sd",
|
||||
"sp",
|
||||
"sa",
|
||||
"sv",
|
||||
)
|
||||
MULTICHAR_KEYBINDINGS = ("gg", "dd", "sd", "sp", "sa", "sv")
|
||||
|
||||
|
||||
class View:
|
||||
|
@ -195,34 +188,72 @@ class MsgView:
|
|||
self.win.resize(self.h, self.w)
|
||||
self.win.mvwin(0, self.x)
|
||||
|
||||
def draw(self, current: int, msgs: Any) -> None:
|
||||
self.win.erase()
|
||||
line_num = self.h
|
||||
|
||||
for i, msg in enumerate(msgs):
|
||||
dt, user_id, msg = self._parse_msg(msg)
|
||||
user_id = self._get_user_by_id(user_id)
|
||||
msg = msg.replace("\n", " ")
|
||||
# count wide character utf-8 symbols that take > 1 bytes to print
|
||||
# it causes invalid offset
|
||||
wide_char_len = sum(map(len, emoji_pattern.findall(msg)))
|
||||
elements = (" {} ".format(dt), user_id, " " + msg)
|
||||
total_len = sum(len(e) for e in elements) + wide_char_len
|
||||
needed_lines = (total_len // self.w) + 1
|
||||
line_num -= needed_lines
|
||||
if line_num <= 0:
|
||||
def _collect_msgs_to_draw(
|
||||
self,
|
||||
current_msg_idx: int,
|
||||
msgs: List[Tuple[int, Dict[str, Any]]],
|
||||
min_msg_padding: int,
|
||||
) -> List[Tuple[Tuple[str, ...], bool, int]]:
|
||||
"""
|
||||
Tries to collect list of messages that will satisfy `min_msg_padding`
|
||||
theshold. Long messages could prevent other messages from displaying on
|
||||
the screen. In order to prevent scenario when *selected* message moved
|
||||
out from the visible area of the screen by some long messages, this
|
||||
function will remove message one by one from the start until selected
|
||||
message could be visible on the screen.
|
||||
"""
|
||||
selected_item_idx = None
|
||||
collected_items: List[Tuple[Tuple[str, ...], bool, int]] = []
|
||||
for ignore_before in range(len(msgs)):
|
||||
if selected_item_idx is not None:
|
||||
break
|
||||
collected_items = []
|
||||
line_num = self.h
|
||||
for msg_idx, msg_item in msgs[ignore_before:]:
|
||||
is_selected_msg = current_msg_idx == msg_idx
|
||||
dt, user_id_item, msg = self._parse_msg(msg_item)
|
||||
user_id = self._get_user_by_id(user_id_item)
|
||||
msg = msg.replace("\n", " ")
|
||||
# count wide character utf-8 symbols that take > 1 bytes to
|
||||
# print it causes invalid offset
|
||||
wide_char_len = sum(map(len, emoji_pattern.findall(msg)))
|
||||
elements = tuple(map(lambda x: f" {x}", (dt, user_id, msg)))
|
||||
total_len = sum(len(e) for e in elements) + wide_char_len
|
||||
|
||||
attrs = [
|
||||
get_color(cyan, -1),
|
||||
get_color(blue, -1),
|
||||
get_color(white, -1),
|
||||
]
|
||||
if i == current:
|
||||
attrs = [attr | reverse for attr in attrs]
|
||||
needed_lines = (total_len // self.w) + 1
|
||||
line_num -= needed_lines
|
||||
if line_num <= 0:
|
||||
break
|
||||
collected_items.append((elements, is_selected_msg, line_num))
|
||||
if is_selected_msg:
|
||||
selected_item_idx = len(collected_items) - 1
|
||||
if (
|
||||
# ignore first and last msg
|
||||
selected_item_idx not in (0, len(msgs) - 1, None)
|
||||
and len(collected_items) - 1 - selected_item_idx
|
||||
< min_msg_padding
|
||||
):
|
||||
selected_item_idx = None
|
||||
|
||||
return collected_items
|
||||
|
||||
def draw(
|
||||
self,
|
||||
current_msg_idx: int,
|
||||
msgs: List[Tuple[int, Dict[str, Any]]],
|
||||
min_msg_padding: int,
|
||||
) -> None:
|
||||
self.win.erase()
|
||||
msgs_to_draw = self._collect_msgs_to_draw(
|
||||
current_msg_idx, msgs, min_msg_padding
|
||||
)
|
||||
|
||||
if not msgs_to_draw:
|
||||
log.error("Can't collect message for drawing!")
|
||||
|
||||
for elements, selected, line_num in msgs_to_draw:
|
||||
column = 0
|
||||
for attr, elem in zip(attrs, elements):
|
||||
for attr, elem in zip(self._msg_attribures(selected), elements):
|
||||
if not elem:
|
||||
continue
|
||||
self.win.addstr(line_num, column, elem, attr)
|
||||
|
@ -230,6 +261,15 @@ class MsgView:
|
|||
|
||||
self._refresh()
|
||||
|
||||
@staticmethod
|
||||
def _msg_attribures(is_selected: bool) -> Iterator[int]:
|
||||
attrs = [
|
||||
get_color(cyan, -1),
|
||||
get_color(blue, -1),
|
||||
get_color(white, -1),
|
||||
]
|
||||
return map(lambda x: x | reverse if is_selected else 0, attrs)
|
||||
|
||||
def _get_user_by_id(self, user_id: int) -> str:
|
||||
if user_id == 0:
|
||||
return ""
|
||||
|
|
Loading…
Reference in a new issue