mirror of
https://github.com/paul-nameless/tg
synced 2025-02-16 18:48:24 +00:00
add dynamic scrolling
This commit is contained in:
parent
7eaf10711a
commit
f61500df33
3 changed files with 91 additions and 45 deletions
|
@ -23,8 +23,11 @@ from tg.views import View
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# start scrolling to next page when number of the msgs left is less than value
|
||||
MSGS_LEFT_SCROLL_THRESHOLD = 10
|
||||
# start scrolling to next page when number of the msgs left is less than value.
|
||||
# note, that setting high values could lead to situations when long msgs will
|
||||
# be removed from the display in order to achive scroll threshold. this could
|
||||
# cause blan areas on the msg display screen
|
||||
MSGS_LEFT_SCROLL_THRESHOLD = 2
|
||||
|
||||
|
||||
class Controller:
|
||||
|
@ -253,14 +256,12 @@ class Controller:
|
|||
current_msg_idx = self.model.get_current_chat_msg_idx()
|
||||
if current_msg_idx is None:
|
||||
return
|
||||
page_size = self.view.msgs.h
|
||||
msgs = self.model.fetch_msgs(
|
||||
current_msg_idx, page_size, MSGS_LEFT_SCROLL_THRESHOLD
|
||||
current_position=current_msg_idx,
|
||||
page_size=self.view.msgs.h,
|
||||
msgs_left_scroll_threshold=MSGS_LEFT_SCROLL_THRESHOLD,
|
||||
)
|
||||
selected_msg = min(
|
||||
current_msg_idx, page_size - MSGS_LEFT_SCROLL_THRESHOLD
|
||||
)
|
||||
self.view.msgs.draw(selected_msg, msgs)
|
||||
self.view.msgs.draw(current_msg_idx, msgs, MSGS_LEFT_SCROLL_THRESHOLD)
|
||||
|
||||
@handle_exception
|
||||
def update_new_msg(self, update):
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import logging
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
from typing import Any, Dict, List, Optional, Set, Union, Tuple
|
||||
|
||||
from telegram.client import Telegram
|
||||
|
||||
|
@ -34,12 +34,13 @@ class Model:
|
|||
current_position: int = 0,
|
||||
page_size: int = 10,
|
||||
msgs_left_scroll_threshold: int = 10,
|
||||
) -> Any:
|
||||
) -> List[Tuple[int, Dict[str, Any]]]:
|
||||
chat_id = self.chats.id_by_index(self.current_chat)
|
||||
if chat_id is None:
|
||||
return []
|
||||
msgs_left = page_size - current_position
|
||||
msgs_left = page_size - 1 - current_position
|
||||
offset = max(msgs_left_scroll_threshold - msgs_left, 0)
|
||||
|
||||
limit = offset + page_size
|
||||
return self.msgs.fetch_msgs(chat_id, offset=offset, limit=limit)
|
||||
|
||||
|
@ -213,6 +214,7 @@ class MsgModel:
|
|||
if new_idx < len(self.msgs[chat_id]):
|
||||
self.current_msgs[chat_id] = new_idx
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def remove_message(self, chat_id, msg_id):
|
||||
|
@ -285,14 +287,18 @@ class MsgModel:
|
|||
|
||||
def fetch_msgs(
|
||||
self, chat_id: int, offset: int = 0, limit: int = 10
|
||||
) -> Any:
|
||||
) -> List[Tuple[int, Dict[str, Any]]]:
|
||||
if offset + limit > len(self.msgs[chat_id]):
|
||||
messages = self._fetch_msgs_until_limit(
|
||||
chat_id, offset, offset + limit
|
||||
)
|
||||
self.add_messages(chat_id, messages)
|
||||
|
||||
return self.msgs[chat_id][offset : offset + limit]
|
||||
return [
|
||||
(i, self.msgs[chat_id][i])
|
||||
for i in range(offset, offset + limit)
|
||||
if i < len(self.msgs[chat_id])
|
||||
]
|
||||
|
||||
def send_message(self, chat_id: int, text: str) -> None:
|
||||
log.info("Sending msg")
|
||||
|
|
|
@ -3,7 +3,7 @@ import logging
|
|||
import re
|
||||
from _curses import window
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple, Iterator
|
||||
|
||||
from tg.colors import blue, cyan, get_color, magenta, reverse, white
|
||||
from tg.msg import MsgProxy
|
||||
|
@ -188,41 +188,71 @@ class MsgView:
|
|||
self.win.resize(self.h, self.w)
|
||||
self.win.mvwin(0, self.x)
|
||||
|
||||
def _format_msg(self, msg: str, msg_item_line: List[str]) -> str:
|
||||
msg = msg.replace("\n", " ")
|
||||
# it causes invalid offset
|
||||
wide_char_len = sum(map(len, emoji_pattern.findall(msg)))
|
||||
total_len = sum(len(e) for e in msg_item_line) + wide_char_len
|
||||
line_space_left = self.w - total_len
|
||||
return (
|
||||
msg[: line_space_left - 4] + "..."
|
||||
if len(msg) > line_space_left
|
||||
else msg
|
||||
def _collect_msgs_to_draw(
|
||||
self,
|
||||
current_msg_idx: int,
|
||||
msgs: List[Tuple[int, Dict[str, Any]]],
|
||||
min_msg_padding: int,
|
||||
) -> List[Tuple[Tuple[str, ...], bool, int]]:
|
||||
"""
|
||||
Tries to collect list of messages that will satisfy `min_msg_padding`
|
||||
theshold. Long messages could prevent other messages from displaying on
|
||||
the screen. In order to prevent scenario when *selected* message moved
|
||||
out from the visible area of the screen by some long messages, this
|
||||
function will remove message one by one from the start until selected
|
||||
message could be visible on the screen.
|
||||
"""
|
||||
selected_item_idx = None
|
||||
for ignore_before in range(len(msgs)):
|
||||
if selected_item_idx is not None:
|
||||
break
|
||||
collected_items = []
|
||||
line_num = self.h
|
||||
for msg_idx, msg_item in msgs[ignore_before:]:
|
||||
is_selected_msg = current_msg_idx == msg_idx
|
||||
dt, user_id_item, msg = self._parse_msg(msg_item)
|
||||
user_id = self._get_user_by_id(user_id_item)
|
||||
msg = msg.replace("\n", " ")
|
||||
# count wide character utf-8 symbols that take > 1 bytes to
|
||||
# print it causes invalid offset
|
||||
wide_char_len = sum(map(len, emoji_pattern.findall(msg)))
|
||||
elements = tuple(map(lambda x: f" {x}", (dt, user_id, msg)))
|
||||
total_len = sum(len(e) for e in elements) + wide_char_len
|
||||
|
||||
needed_lines = (total_len // self.w) + 1
|
||||
line_num -= needed_lines
|
||||
if line_num <= 0:
|
||||
break
|
||||
collected_items.append((elements, is_selected_msg, line_num))
|
||||
if is_selected_msg:
|
||||
selected_item_idx = len(collected_items) - 1
|
||||
if (
|
||||
# ignore first and last msg
|
||||
selected_item_idx not in (0, len(msgs) - 1, None)
|
||||
and len(collected_items) - 1 - selected_item_idx
|
||||
< min_msg_padding
|
||||
):
|
||||
selected_item_idx = None
|
||||
|
||||
return collected_items
|
||||
|
||||
def draw(
|
||||
self,
|
||||
current_msg_idx: int,
|
||||
msgs: List[Tuple[int, Dict[str, Any]]],
|
||||
min_msg_padding: int,
|
||||
) -> None:
|
||||
self.win.erase()
|
||||
msgs_to_draw = self._collect_msgs_to_draw(
|
||||
current_msg_idx, msgs, min_msg_padding
|
||||
)
|
||||
|
||||
def draw(self, current: int, msgs: Any) -> None:
|
||||
self.win.erase()
|
||||
line_num = self.h
|
||||
|
||||
for i, msg in enumerate(msgs):
|
||||
dt, user_id, msg = self._parse_msg(msg)
|
||||
user = self._get_user_by_id(user_id)
|
||||
msg_item_line = [f" {dt} ", user]
|
||||
msg_item_line.append(self._format_msg(msg, msg_item_line))
|
||||
line_num -= 1
|
||||
if line_num < 0:
|
||||
break
|
||||
|
||||
attrs = [
|
||||
get_color(cyan, -1),
|
||||
get_color(blue, -1),
|
||||
get_color(white, -1),
|
||||
]
|
||||
if i == current:
|
||||
attrs = [attr | reverse for attr in attrs]
|
||||
if not msgs_to_draw:
|
||||
log.error("Can't collect message for drawing!")
|
||||
|
||||
for elements, selected, line_num in msgs_to_draw:
|
||||
column = 0
|
||||
for attr, elem in zip(attrs, msg_item_line):
|
||||
for attr, elem in zip(self._msg_attribures(selected), elements):
|
||||
if not elem:
|
||||
continue
|
||||
self.win.addstr(line_num, column, elem, attr)
|
||||
|
@ -230,6 +260,15 @@ class MsgView:
|
|||
|
||||
self._refresh()
|
||||
|
||||
@staticmethod
|
||||
def _msg_attribures(is_selected: bool) -> Iterator[int]:
|
||||
attrs = [
|
||||
get_color(cyan, -1),
|
||||
get_color(blue, -1),
|
||||
get_color(white, -1),
|
||||
]
|
||||
return map(lambda x: x | reverse if is_selected else 0, attrs)
|
||||
|
||||
def _get_user_by_id(self, user_id: int) -> str:
|
||||
if user_id == 0:
|
||||
return ""
|
||||
|
|
Loading…
Add table
Reference in a new issue