Reorganize structure of the project

Release package on PyPi
This commit is contained in:
Paul Nameless 2019-02-20 19:15:43 +01:00
parent ac72c5bcb0
commit cf3f047fe8
16 changed files with 211 additions and 467 deletions

0
.gitignore vendored Normal file
View file

3
README.md Normal file
View file

@ -0,0 +1,3 @@
# Tg
TUI client for telegram

0
UNLICENSE Normal file
View file

16
pyproject.toml Normal file
View file

@ -0,0 +1,16 @@
[tool.flit.metadata]
module = "tg"
dist-name = "tg"
description-file = "README.md"
author = "Paul Nameless"
author-email = "paul.nameless@icloud.com"
home-page = 'https://github.com/paul-nameless/tg'
classifiers = [
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
]
requires-python = ">=3.7"
requires = ['python-telegram']
[tool.flit.scripts]
j = "j.main:main"

415
tg.py
View file

@ -1,415 +0,0 @@
import curses
import logging
import logging.handlers
import math
import os
import re
import threading
from collections import defaultdict
from curses import wrapper
from datetime import datetime
from telegram.client import Telegram
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(message)s',
handlers=[
logging.handlers.RotatingFileHandler(
'./debug.log',
backupCount=1,
maxBytes=1024*256
),
]
)
logger = logging.getLogger(__name__)
API_ID = os.getenv('API_ID')
API_HASH = os.getenv('API_HASH')
PHONE = os.getenv('PHONE')
if PHONE is None:
raise Exception('Environment variables did not provided')
def get_last_msg(chat):
content = chat['last_message']['content']
_type = content['@type']
if _type == 'messageText':
return content['text']['text']
elif _type == 'messageVoiceNote':
return '[voice msg]'
else:
logger.error(chat)
return f'[unknown type {_type}]'
def get_date(chat):
# return str(datetime.fromtimestamp(chat['last_message']['date']))
dt = datetime.fromtimestamp(chat['last_message']['date'])
if datetime.today().date() == dt.date():
return dt.strftime("%H:%M")
return dt.strftime("%d/%b/%y")
def parse_content(content):
_type = content['@type']
if _type == 'messageText':
return content['text']['text']
elif _type == 'messageVoiceNote':
return '[voice msg]'
else:
logger.debug('Unknown content: %s', content)
return f'[unknown type {_type}]'
class ChatModel:
def __init__(self, tg):
self.tg = tg
self.chats = [] # Dict[int, list]
self.chat_ids = []
def get_chats(self, offset=0, limit=10):
if offset + limit < len(self.chats):
# return data from cache
return self.chats[offset:limit]
self.get_chat_ids(
offset=len(self.chats),
limit=len(self.chats) + limit
)
for i in range(3):
for chat_id in self.chat_ids:
chat = self.get_chat(chat_id)
self.chats.append(chat)
logger.debug(
'#### %s: %s, %s', chat_id, chat, i)
if len(self.chats) >= offset + limit:
break
return self.chats[offset:limit]
def get_chat_ids(self, offset=0, limit=10):
for i in range(3):
if len(self.chats):
result = self.tg.get_chats(
offset_chat_id=self.chats[-1]['id'],
limit=limit
)
else:
result = self.tg.get_chats(
offset_order=2 ** 63 - 1,
offset_chat_id=offset,
limit=limit
)
result.wait()
if result.error:
logger.error(f'get chat ids error: {result.error_info}')
return {}
for chat_id in result.update['chat_ids']:
self.chat_ids.append(chat_id)
if len(self.chat_ids) >= offset + limit:
break
return self.chat_ids[offset:limit]
def get_chat(self, chat_id):
result = self.tg.get_chat(chat_id)
result.wait()
if result.error:
logger.error(f'get chat error: {result.error_info}')
return {}
return result.update
class MsgModel:
def __init__(self, tg):
self.tg = tg
self.msgs = defaultdict(list) # Dict[int, list]
def get_msgs(self, chat_id, offset=0, limit=10):
if offset + limit < len(self.msgs[chat_id]):
return sorted(self.msgs[chat_id], key=lambda d: d['id'])[::-1][offset:limit][::-1]
for i in range(3):
if len(self.msgs[chat_id]):
result = self.tg.get_chat_history(
chat_id,
from_message_id=self.msgs[chat_id][-1]['id'],
limit=len(self.msgs[chat_id]) + limit
)
else:
result = self.tg.get_chat_history(
chat_id,
offset=len(self.msgs[chat_id]),
limit=len(self.msgs[chat_id]) + limit
)
result.wait()
for msg in result.update['messages']:
self.msgs[chat_id].append(msg)
if len(self.msgs[chat_id]) >= offset + limit:
break
return sorted(self.msgs[chat_id], key=lambda d: d['id'])[::-1][offset:limit][::-1]
class UserModel:
def __init__(self, tg):
self.tg = tg
class Model:
def __init__(self, tg):
self.chats = ChatModel(tg)
self.msgs = MsgModel(tg)
self.users = UserModel(tg)
self.current_chat = 0
def get_current_chat_id(self):
return self.chats.chat_ids[self.current_chat]
def next_chat(self):
if self.current_chat < len(self.chats.chats):
self.current_chat += 1
return True
return False
def prev_chat(self):
if self.current_chat > 0:
self.current_chat -= 1
return True
return False
def get_chats(self, offset=0, limit=10):
return self.chats.get_chats(offset=offset, limit=limit)
def get_current_msgs(self, offset=0, limit=10):
chat_id = self.chats.chat_ids[self.current_chat]
return self.msgs.get_msgs(
chat_id, offset=offset, limit=limit
)
def send_msg(self, chat_id, msg):
result = self.users.tg.send_message(
chat_id=chat_id,
text=msg,
)
result.wait()
if result.error:
logger.info(f'send message error: {result.error_info}')
else:
logger.info(f'message has been sent: {result.update}')
class View:
def __init__(self, stdscr):
curses.start_color()
curses.echo()
self.stdscr = stdscr
self.chats = ChatView(stdscr)
self.msgs = MsgView(stdscr)
self.max_read = 2048
def draw_chats(self, current, chats):
self.chats.draw(current, chats)
def draw_msgs(self, msgs):
self.msgs.draw(msgs)
def get_key(self):
# return self.stdscr.getkey()
_input = self.stdscr.getstr(
self.msgs.h, self.chats.w, self.max_read).decode()
# self.stdscr.addstr(self.msgs.h, self.chats.w, ' ' * self.msgs.w-10)
# self.chats.win.addstr(self.msgs.h, self.chats.w +
# 5, ' ' * self.msgs.w-10)
return _input
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+",
flags=re.UNICODE
)
class ChatView:
def __init__(self, stdscr):
self.h = curses.LINES - 1
self.w = int((curses.COLS - 1) * 0.25)
self.win = stdscr.subwin(self.h, self.w, 0, 0)
def draw(self, current, chats):
self.win.clear()
self.win.vline(0, self.w-1, curses.ACS_VLINE, self.h)
for i, chat in enumerate(chats):
msg = f'{i:>2} {get_date(chat)} {chat["title"]} {chat["unread_count"]}: {get_last_msg(chat)}'
msg = emoji_pattern.sub(r'', msg)[:self.w-1]
# msg = msg.encode('utf-8').decode('ascii', 'ignore')[:self.w-1]
if i == current:
self.win.addstr(i, 0, msg, curses.color_pair(1))
continue
self.win.addstr(i, 0, msg)
self.win.refresh()
class MsgView:
def __init__(self, stdscr):
self.h = curses.LINES - 1
self.w = curses.COLS - int((curses.COLS - 1) * 0.25)
self.s = curses.COLS - self.w
self.win = stdscr.subwin(self.h, self.w, 0, self.s)
self.lines = 0
def draw(self, msgs):
self.win.clear()
count = 0
for msg in msgs:
logger.debug('##########: %s', msg)
s = self._parse_msg(msg)
s = s.replace('\n', ' ')
offset = math.ceil(len(s) / self.w)
if count + offset > self.h-1:
logger.warning('Reched end of lines')
break
self.win.addstr(count, 0, s)
count += offset
self.lines = count
self.win.refresh()
def _parse_msg(self, msg):
dt = datetime.fromtimestamp(
msg['date']).strftime("%H:%M:%S")
_type = msg['@type']
if _type == 'message':
return "{} {}: {}".format(
dt,
msg['sender_user_id'],
parse_content(msg['content'])
)
logger.debug('Unknown message type: %s', msg)
return 'unknown msg type: ' + str(msg['content'])
class Controller:
"""
# MVC
# Model is data from telegram
# Controller handles keyboad events
# View is terminal vindow
"""
def __init__(self, model, view):
self.model = model
self.view = view
self.lock = threading.Lock()
def init(self):
self.view.draw_chats(
self.model.current_chat,
self.model.get_chats()
)
msgs = self.model.get_current_msgs()
self.view.draw_msgs(msgs)
def run(self):
while True:
key = self.view.get_key()
logger.info('Pressed key: %s', key)
if key == '/q':
return
elif key == '/j':
is_changed = self.model.next_chat()
logger.info('Is changed: %s', is_changed)
if is_changed:
self.view.draw_chats(
self.model.current_chat,
self.model.get_chats()
)
msgs = self.model.get_current_msgs()
self.view.draw_msgs(msgs)
elif key == '/k':
is_changed = self.model.prev_chat()
if is_changed:
self.view.draw_chats(
self.model.current_chat,
self.model.get_chats()
)
msgs = self.model.get_current_msgs()
self.view.draw_msgs(msgs)
elif not key.startswith('/'):
chat_id = self.model.get_current_chat_id()
self.model.send_msg(chat_id, key)
self.view.draw_chats(
self.model.current_chat,
self.model.get_chats()
)
msgs = self.model.get_current_msgs()
self.view.draw_msgs(msgs)
def update_handler(self, update):
logger.debug('===============Received: %s', update)
_type = update['@type']
if _type == 'updateNewMessage':
logger.debug('Updating... new message')
# with self.lock:
chat_id = update['message']['chat_id']
self.model.msgs.msgs[chat_id].append(update['message'])
msgs = self.model.get_current_msgs()
self.view.draw_msgs(msgs)
# message_content = update['message']['content'].get('text', {})
# we need this because of different message types: photos, files, etc.
# message_text = message_content.get('text', '').lower()
# if message_text == 'ping':
# chat_id = update['message']['chat_id']
# # print(f'Ping has been received from {chat_id}')
# self.tg.send_message(
# chat_id=chat_id,
# text='pong',
# )
def main(stdscr):
logger.debug('#' * 64)
tg = Telegram(
api_id=API_ID,
api_hash=API_HASH,
phone=PHONE,
database_encryption_key='changeme1234',
)
tg.login()
view = View(stdscr)
model = Model(tg)
controller = Controller(model, view)
controller.tg = tg
controller.init()
tg.add_message_handler(controller.update_handler)
t = threading.Thread(
target=controller.run,
)
t.start()
t.join()
if __name__ == '__main__':
wrapper(main)

1
tg/.#main.py Symbolic link
View file

@ -0,0 +1 @@
paul@arya.local.36550

Binary file not shown.

162
tg/controllers/__init__.py Normal file
View file

@ -0,0 +1,162 @@
import logging
import os
import threading
from utils import notify
logger = logging.getLogger(__name__)
class Controller:
"""
# MVC
# Model is data from telegram
# Controller handles keyboad events
# View is terminal vindow
"""
def __init__(self, model, view):
self.model = model
self.view = view
self.lock = threading.Lock()
def run(self):
try:
self.handle_chats()
except Exception as e:
logger.exception('Error happened in main loop')
def handle_msgs(self):
# set width to 0.25, move window to left
# refresh everything
self.view.chats.resize(0.2)
self.view.msgs.resize(0.2)
self.refresh_chats()
while True:
key = self.view.get_key(self.view.chats.h, self.view.chats.w)
logger.info('Pressed key: %s', key)
if key == 'q':
return 'QUIT'
elif key == ']':
if self.model.next_chat():
self.refresh_chats()
elif key == '[':
if self.model.prev_chat():
self.refresh_chats()
elif key == 'J':
if self.model.jump_next_msg():
self.refresh_msgs()
elif key == 'K':
if self.model.jump_prev_msg():
self.refresh_msgs()
elif key in ('j', '^B'):
if self.model.next_msg():
self.refresh_msgs()
elif key in ('k', '^C'):
if self.model.prev_msg():
self.refresh_msgs()
elif key == 'G':
if self.model.jump_bottom():
self.refresh_msgs()
elif key == '/':
# search
pass
elif key == 'gg':
# move to the top
pass
elif key == 'e':
# edit msg
pass
elif key == 'r':
# reply to this msg
# print to status line
pass
elif key == 'I':
# open vim or emacs to write long messages
pass
elif key == 'i':
# write new message
msg = self.view.get_input()
if msg:
chat_id = self.model.get_current_chat_id()
self.model.msgs.tg.send_message(
chat_id=chat_id,
text=msg,
)
self.view.draw_status(f'Sent: {msg}')
elif key in ('h', '^D'):
return 'BACK'
def handle_chats(self):
# set width to 0.5, move window to center?
# refresh everything
self.view.chats.resize(0.5)
self.view.msgs.resize(0.5)
self.refresh_chats()
while True:
key = self.view.get_key(self.view.chats.h, self.view.chats.w)
logger.info('Pressed key: %s', key)
if key == 'q':
return
elif key in ('l', '^E'):
rc = self.handle_msgs()
if rc == 'QUIT':
return
self.view.chats.resize(0.5)
self.view.msgs.resize(0.5)
self.refresh_chats()
elif key in ('j', '^B'):
is_changed = self.model.next_chat()
if is_changed:
self.refresh_chats()
elif key in ('k', '^C'):
is_changed = self.model.prev_chat()
if is_changed:
self.refresh_chats()
def refresh_chats(self):
self.view.draw_chats(
self.model.current_chat,
self.model.get_chats(limit=self.view.chats.h)
)
self.refresh_msgs()
self.view.draw_status()
def refresh_msgs(self):
self.view.msgs.users = self.model.users
msgs = self.model.get_current_msgs(limit=self.view.msgs.h)
self.view.draw_msgs(self.model.get_current_msg(), msgs)
def update_handler(self, update):
logger.debug('===============Received: %s', update)
_type = update['@type']
if _type == 'updateNewMessage':
logger.debug('Updating... new message')
# with self.lock:
chat_id = update['message']['chat_id']
self.model.msgs.msgs[chat_id].append(update['message'])
# msgs = self.model.get_current_msgs()
self.refresh_msgs()
if not update['disable_notification']:
try:
notify(update['message']['content']['text']['text'])
except Exception:
logger.exception('Error happened on notify: %s', update)
# message_content = update['message']['content'].get('text', {})
# we need this because of different message types: photos, files, etc.
# message_text = message_content.get('text', '').lower()
# if message_text == 'ping':
# chat_id = update['message']['chat_id']
# # print(f'Ping has been received from {chat_id}')
# self.tg.send_message(
# chat_id=chat_id,
# text='pong',
# )

Binary file not shown.

View file

@ -7,9 +7,9 @@ from functools import partial
from telegram.client import Telegram
from controller import Controller
from model import Model
from view import View
from tg.controllers import Controller
from tg.models import Model
from tg.views import View
logging.basicConfig(
level=logging.DEBUG,

Binary file not shown.

BIN
tg/tg.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

View file

@ -158,14 +158,14 @@ class ChatView:
# msg += ' ' * (self.w - len(msg) - 1)
if i == current:
colors = [7, 8, 9, 10]
colors = [8, 9, 7, 10]
else:
colors = [1, 2, 3, 4]
colors = [2, 3, 1, 4]
offset = 0
j = 0
# for color, e in zip(colors, msg.split(' ', maxsplit=3)):
for color, e in zip(colors, ['', date, title]):
for color, e in zip(colors, [' ' + date, title]):
attr = curses.color_pair(color)
if offset > self.w:
break
@ -178,7 +178,7 @@ class ChatView:
if offset >= self.w:
continue
attr = curses.color_pair(colors[0])
attr = curses.color_pair(colors[-2])
msg = last_msg[:self.w-offset-1]
# msg = msg[:self.w-1]
@ -230,19 +230,23 @@ class MsgView:
# self.win.mvwin(0, self.x)
def draw(self, current, msgs):
logger.info('Dwaring msgs')
# logger.info('Dwaring msgs')
self.win.clear()
count = self.h
for i, msg in enumerate(msgs):
s = self._parse_msg(msg)
s = s.replace('\n', ' ')
if len(s) < self.w:
s += ' ' * (self.w - len(s) - 1)
offset = math.ceil(len(s) / self.w)
# s = self._parse_msg(msg)
dt, user_id, msg = self._parse_msg(msg)
user_id = self._get_user_by_id(user_id)
msg = msg.replace('\n', ' ')
s = ' '.join([' ' + dt, user_id, msg])
# s = s.replace('\n', ' ')
# if len(s) < self.w:
# s += ' ' * (self.w - len(s) - 1)
offset = math.ceil((len(s) - 1) / self.w)
count -= offset
if count <= 0:
logger.warning('Reched end of lines')
# logger.warning('Reched end of lines')
break
if i == current:
@ -257,61 +261,34 @@ class MsgView:
j += 1
if j < 4:
e = e + ' '
# logger.info('####: %s', (e, offset, count))
self.win.addstr(count, offset, e, attr)
offset += len(e)
# if i == current:
# offset = 0
# j = 0
# for i, e in zip([7, 8, 9, 7], s.split(' ', maxsplit=3)):
# logger.debug('####: %s | %s', i, e)
# attr = curses.color_pair(i)
# j += 1
# if j < 4:
# e = e + ' '
# self.win.addstr(count, offset, e, attr)
# offset += len(e)
# # attr = curses.A_REVERSE | curses.color_pair(6)
# # self.win.addstr(count, 0, s, attr)
# else:
# offset = 0
# j = 0
# for i, e in zip([1, 2, 4, 1], s.split(' ', maxsplit=3)):
# logger.debug('####: %s | %s', i, e)
# attr = curses.color_pair(i)
# j += 1
# if j < 4:
# e = e + ' '
# self.win.addstr(count, offset, e, attr)
# offset += len(e)
# # attr = curses.color_pair(0)
# # self.win.addstr(count, 0, s, attr)
self.win.refresh()
def _get_user_by_id(self, user_id):
if user_id == 0:
return ''
user = self.users.get_user(user_id)
if user["first_name"] and user["last_name"]:
return f'{user["first_name"]} {user["last_name"]}'[:20]
if user["first_name"]:
return f'{user["first_name"]}'[:20]
if user.get('username'):
return '@' + user['username']
if user["first_name"] and user["last_name"]:
return f'{user["first_name"]} {user["last_name"]}'
return f'{user["first_name"]}'
return 'Unknown?'
def _parse_msg(self, msg):
dt = datetime.fromtimestamp(
msg['date']).strftime("%H:%M:%S")
_type = msg['@type']
if _type == 'message':
user = self._get_user_by_id(msg['sender_user_id'])
return " {} {}: {}".format(
dt,
user,
parse_content(msg['content'])
)
return dt, msg['sender_user_id'], parse_content(msg['content'])
logger.debug('Unknown message type: %s', msg)
return ' unknown msg type: ' + str(msg['content'])
return dt, msg['sender_user_id'], 'unknown msg type: ' + str(msg['content'])
def get_last_msg(chat):
@ -341,7 +318,7 @@ def parse_content(content):
return '[voice msg]'
else:
logger.debug('Unknown content: %s', content)
return f'[unknown type {_type}]'
return f'[unknown content type {_type}]'
emoji_pattern = re.compile(

Binary file not shown.