Add indexes for msgs (#91)

* Draft: adding lock to message update and using indexes for msgs

* Fix black formatting

* Rename msg_ids->msg_idx to better represent what variable contains

* Fix black formatting

* Check if msg_id in not_found

* Use lock in udpate_handler

* Remove lock
This commit is contained in:
Nameless 2020-07-03 10:05:12 +08:00 committed by GitHub
parent 723bd5cfeb
commit eabe39d3c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,7 +1,7 @@
import logging import logging
import time import time
from collections import defaultdict from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, Union from typing import Any, Dict, List, Optional, Set, Tuple
from tg.msg import MsgProxy from tg.msg import MsgProxy
from tg.tdlib import ChatAction, Tdlib, UserStatus from tg.tdlib import ChatAction, Tdlib, UserStatus
@ -290,7 +290,7 @@ class MsgModel:
self.msgs: Dict[int, List[Dict]] = defaultdict(list) self.msgs: Dict[int, List[Dict]] = defaultdict(list)
self.current_msgs: Dict[int, int] = defaultdict(int) self.current_msgs: Dict[int, int] = defaultdict(int)
self.not_found: Set[int] = set() self.not_found: Set[int] = set()
self.msg_ids: Dict[int, Set[int]] = defaultdict(set) self.msg_idx: Dict[int, Dict[int, int]] = defaultdict(dict)
def next_msg(self, chat_id: int, step: int = 1) -> bool: def next_msg(self, chat_id: int, step: int = 1) -> bool:
current_msg = self.current_msgs[chat_id] current_msg = self.current_msgs[chat_id]
@ -310,77 +310,57 @@ class MsgModel:
if new_idx < len(self.msgs[chat_id]): if new_idx < len(self.msgs[chat_id]):
self.current_msgs[chat_id] = new_idx self.current_msgs[chat_id] = new_idx
return True return True
return False return False
def get_message(self, chat_id: int, msg_id: int) -> Optional[Dict]: def get_message(self, chat_id: int, msg_id: int) -> Optional[Dict]:
if msg_id in self.not_found: if msg_id in self.not_found:
return None return None
msg_set = self.msg_ids[chat_id] if index := self.msg_idx[chat_id].get(msg_id):
if msg_id not in msg_set: return self.msgs[chat_id][index]
# we are not storing any out of ordres old msgs # we are not storing any out of ordres old msgs
# just fetching then on demand # just fetching them on demand
result = self.tg.get_message(chat_id, msg_id) result = self.tg.get_message(chat_id, msg_id)
result.wait() result.wait()
if result.error: if result.error:
self.not_found.add(msg_id) self.not_found.add(msg_id)
return None return None
return result.update return result.update
return next(iter(m for m in self.msgs[chat_id] if m["id"] == msg_id))
def remove_messages(self, chat_id: int, msg_ids: List[int]) -> None: def remove_messages(self, chat_id: int, msg_idx: List[int]) -> None:
log.info(f"removing msgs {msg_ids=}") log.info(f"removing msg {msg_idx=}")
# FIXME: potential bottleneck, replace with constan time operation
self.msgs[chat_id] = [ self.msgs[chat_id] = [
m for m in self.msgs[chat_id] if m["id"] not in msg_ids m for m in self.msgs[chat_id] if m["id"] not in msg_idx
] ]
msg_set = self.msg_ids[chat_id] self.msg_idx[chat_id] = {
for msg_id in msg_ids: msg["id"]: i for i, msg in enumerate(self.msgs[chat_id])
msg_set.discard(msg_id) }
def add_message(self, chat_id: int, msg: Dict[str, Any]) -> None:
log.info(f"adding {msg=}")
self.msgs[chat_id] = sorted(
self.msgs[chat_id] + [msg], key=lambda d: d["id"], reverse=True,
)
self.msg_idx[chat_id] = {
msg["id"]: i for i, msg in enumerate(self.msgs[chat_id])
}
def update_msg_content_opened(self, chat_id: int, msg_id: int) -> None: def update_msg_content_opened(self, chat_id: int, msg_id: int) -> None:
for message in self.msgs[chat_id]: index = self.msg_idx[chat_id][msg_id]
if message["id"] != msg_id: msg = MsgProxy(self.msgs[chat_id][index])
continue if msg.content_type == "voice":
msg = MsgProxy(message) msg.is_listened = True
if msg.content_type == "voice": elif msg.content_type == "recording":
msg.is_listened = True msg.is_viewed = True
elif msg.content_type == "recording": # TODO: start the TTL timer for self-destructing messages
msg.is_viewed = True # that is the last case to implement
# TODO: start the TTL timer for self-destructing messages # https://core.telegram.org/tdlib/docs/classtd_1_1td__api_1_1update_message_content_opened.html
# that is the last case to implement
# https://core.telegram.org/tdlib/docs/classtd_1_1td__api_1_1update_message_content_opened.html
return
def update_msg( def update_msg(
self, chat_id: int, msg_id: int, **fields: Dict[str, Any] self, chat_id: int, msg_id: int, **fields: Dict[str, Any]
) -> bool: ) -> None:
msg = None index = self.msg_idx[chat_id][msg_id]
for message in self.msgs[chat_id]: msg = self.msgs[chat_id][index]
if message["id"] == msg_id:
msg = message
break
if not msg:
return False
msg.update(fields) msg.update(fields)
return True
def add_message(self, chat_id: int, message: Dict[str, Any]) -> bool:
msg_id = message["id"]
msg_set = self.msg_ids[chat_id]
if msg_id in msg_set:
log.warning(
f"message {msg_id} was added earlier. probably, inaccurate "
"usage of the tdlib lead to unnecessary requests"
)
return False
log.info(f"adding {msg_id=} {message}")
self.msgs[chat_id].append(message)
msg_set.add(msg_id)
self.msgs[chat_id] = sorted(
self.msgs[chat_id], key=lambda d: d["id"], reverse=True
)
return True
def _fetch_msgs_until_limit( def _fetch_msgs_until_limit(
self, chat_id: int, offset: int = 0, limit: int = 10 self, chat_id: int, offset: int = 0, limit: int = 10
@ -422,10 +402,10 @@ class MsgModel:
self, chat_id: int, offset: int = 0, limit: int = 10 self, chat_id: int, offset: int = 0, limit: int = 10
) -> List[Tuple[int, Dict[str, Any]]]: ) -> List[Tuple[int, Dict[str, Any]]]:
if offset + limit > len(self.msgs[chat_id]): if offset + limit > len(self.msgs[chat_id]):
messages = self._fetch_msgs_until_limit( msgs = self._fetch_msgs_until_limit(
chat_id, offset, offset + limit chat_id, offset, offset + limit
) )
for msg in messages: for msg in msgs:
self.add_message(chat_id, msg) self.add_message(chat_id, msg)
return [ return [