mirror of
https://github.com/fotoente/MIsskey-ebooks-bot
synced 2024-11-13 23:27:10 +00:00
68 lines
2.4 KiB
Python
Executable file
68 lines
2.4 KiB
Python
Executable file
import asyncio
|
|
import threading
|
|
|
|
from mipa.ext import commands, tasks
|
|
from mipa.router import Router
|
|
from mipac.models import Note
|
|
from mipac.util import check_multi_arg
|
|
|
|
import roboduck
|
|
|
|
# Load Misskey configuration
|
|
config = roboduck.configparser.ConfigParser()
|
|
config.read(roboduck.Path(__file__).parent.joinpath('bot.cfg'))
|
|
url = "https://" + config.get("misskey", "instance_write")
|
|
token = config.get("misskey", "token")
|
|
|
|
try:
|
|
contentwarning = config.get("misskey", "cw")
|
|
if contentwarning.lower() == "none":
|
|
contentwarning = None
|
|
except (TypeError, ValueError, roboduck.configparser.NoOptionError):
|
|
contentwarning = None
|
|
|
|
if not check_multi_arg(url, token):
|
|
raise Exception("Misskey instance and token are required.")
|
|
|
|
class MyBot(commands.Bot):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
@tasks.loop(3600)
|
|
async def loop_1h(self):
|
|
text = roboduck.create_sentence()
|
|
await bot.client.note.action.send(content=text, visibility="public", cw=contentwarning)
|
|
|
|
@tasks.loop(43200)
|
|
async def loop_12h(self):
|
|
thread_update = threading.Thread(target=roboduck.update())
|
|
thread_update.daemon = True
|
|
thread_update.start()
|
|
|
|
async def on_ready(self, ws):
|
|
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
|
|
await self.client.note.action.send(content=roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!",
|
|
visibility="specified")
|
|
self.loop_12h.start() # Launching renew posts every 12 hours
|
|
self.loop_1h.start() #
|
|
print(roboduck.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + " Roboduck Bot started!")
|
|
|
|
async def on_mention(self, note: Note):
|
|
if not note.author.is_bot:
|
|
text = note.author.action.get_mention() + " "
|
|
text += roboduck.create_sentence()
|
|
|
|
await note.reply(content=text, cw=contentwarning) # Reply to a note
|
|
|
|
async def on_reconnect(self, ws):
|
|
await Router(ws).connect_channel(["global", "main"]) # Connect to global and main channels
|
|
|
|
if __name__ == "__main__":
|
|
databasepath = roboduck.Path(__file__).parent.joinpath('roboduck.db')
|
|
|
|
if not (roboduck.os.path.exists(databasepath) and roboduck.os.stat(databasepath).st_size != 0):
|
|
init_bot()
|
|
|
|
bot = MyBot()
|
|
asyncio.run(bot.start(url, token))
|