Merge pull request #33 from fotoente/dev

Dev
This commit is contained in:
fotoente 2022-07-23 11:57:37 +02:00 committed by GitHub
commit ef8e6d6178
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 286 additions and 42 deletions

3
.gitignore vendored
View file

@ -4,4 +4,5 @@ test-rd.py
bot.cfg
__pycache__
mia-markov-chain
.venv
.venv
notes.txt

View file

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (roboduck)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (MIsskey-ebooks-bot)" project-jdk-type="Python SDK" />
</project>

View file

@ -6,7 +6,7 @@ Misskey eBooks Bot with Markov Chain
## Introduction
This small python script is a Markov Chain eBooks bot based on the framework of [MiPA](https://github.com/yupix/MiPA.git)
It can only read and write from and to Misskey. Reading from Mastodon or Pleroma is not (yet) implemented.
It can only write to Misskey. It is possible to read from Misskey, Pleroma or Mastodon instances.
It posts every hour on his own and reacts to mention. Every 12 hours the bot reloads the notes and recalculates the Markov Chain.

View file

@ -24,6 +24,7 @@ except (TypeError, ValueError, roboduck.configparser.NoOptionError):
if not check_multi_arg(url, token):
raise Exception("Misskey instance and token are required.")
class MyBot(commands.Bot):
def __init__(self):

View file

@ -20,20 +20,41 @@ def check_str_to_bool(text) -> bool:
return True
def get_user_id(username, instance):
def get_endpoint(instance: str) -> str:
# Try Misskey
url = "https://" + instance + "/api/ping"
req = requests.post(url)
if req.status_code == 200 and ("pong" in req.json()):
return "Misskey"
# Try Mastodon and Pleroma
url = "https://" + instance + "/api/v1/instance" # Pleroma uses the same API as Mastodon
req = requests.get(url)
if req.status_code == 200:
version = req.json()["version"]
if version.find("(compatible; Pleroma") > 0: # String only available in Pleroma instances. Mastodon will
return "Pleroma"
else:
return "Mastodon"
return "unknown"
def misskey_get_user_id(username: str, instance: str) -> str:
url = "https://" + instance + "/api/users/show"
try:
req = requests.post(url, json={"username": username, "host": instance})
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Username! " + str(err))
sys.exit(1)
return ""
return req.json()["id"]
def get_notes(**kwargs):
def misskey_get_notes(**kwargs):
note_id = "k"
sinceid = ""
since_id = ""
min_notes = 0
notes_list = []
return_list = []
@ -50,7 +71,7 @@ def get_notes(**kwargs):
elif "lastnote" in kwargs:
# print("Lastnote found!")
init = False
sinceid = kwargs["lastnote"]
since_id = kwargs["lastnote"]
else:
print("Wrong arguments given!")
@ -64,9 +85,9 @@ def get_notes(**kwargs):
# Load configuration
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'bot.cfg'))
# print(os.path.join(os.path.dirname(__file__), 'bot.cfg'))
userid = get_user_id(username, instance)
userid = misskey_get_user_id(username, instance) # Here are only Misskey ID is necessary so no need to check
# endpoint again
# Read & Sanitize Inputs from Config File
try:
@ -97,7 +118,8 @@ def get_notes(**kwargs):
if (init and len(notes_list) >= min_notes) or (oldnote == note_id):
break
if not init: # sinceid should only be used when updating the database so the json object has to be parsed every time
if not init: # sinceid should only be used when updating the database so the json object has to be parsed
# every time
api_json = {
"userId": userid,
"includeReplies": include_replies,
@ -106,7 +128,7 @@ def get_notes(**kwargs):
"withFiles": False,
"excludeNsfw": exclude_nsfw,
"untilId": note_id,
"sinceId": sinceid}
"sinceId": since_id}
else:
api_json = {
"userId": userid,
@ -139,7 +161,7 @@ def get_notes(**kwargs):
for element in notes_list:
last_time = element["createdAt"]
lastTimestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000)
last_timestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000)
content = element["text"]
@ -152,14 +174,207 @@ def get_notes(**kwargs):
content = content.replace("@", "@" + chr(8203))
if exclude_links:
content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))", "", content)
content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))",
"", content)
note_dict = {"id": element["id"], "text": content, "timestamp": lastTimestamp, "user_id": userid}
note_dict = {"id": element["id"], "text": content, "timestamp": last_timestamp, "user_id": userid}
return_list.append(note_dict)
return return_list
def mastodon_get_user_id(username: str, instance: str) -> str:
url = "https://" + instance + "/api/v1/accounts/lookup?acct=" + username
try:
req = requests.get(url)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Username! " + str(err))
return ""
return req.json()["id"]
def mastodon_get_notes(**kwargs):
note_id = "k"
since_id = ""
min_notes = 0
notes_list = []
return_list = []
username = kwargs["username"]
instance = kwargs["instance"]
print("Reading notes for @" + username + "@" + instance + ".")
if kwargs:
if "min_notes" in kwargs:
# print("min_notes found!")
init = True
min_notes = kwargs["min_notes"]
elif "lastnote" in kwargs:
# print("Lastnote found!")
init = False
since_id = kwargs["lastnote"]
else:
print("Wrong arguments given!")
print("Exiting routine!")
return
else:
print("No arguments given!")
print("Exiting routine")
return None
# Load configuration
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'bot.cfg'))
userid = mastodon_get_user_id(username, instance) # Here are only Mastodon ID is necessary so no need to check
# endpoint again
# Read & Sanitize Inputs from Config File
try:
include_replies = check_str_to_bool(config.get("markov", "includeReplies"))
except (TypeError, ValueError, configparser.NoOptionError):
include_replies = True
try:
include_my_renotes = check_str_to_bool(config.get("markov", "includeMyRenotes"))
except (TypeError, ValueError, configparser.NoOptionError):
include_my_renotes = False
try:
exclude_nsfw = check_str_to_bool(config.get("markov", "excludeNsfw"))
except (TypeError, ValueError, configparser.NoOptionError):
exclude_nsfw = True
try:
exclude_links = check_str_to_bool(config.get("markov", "exclude_links"))
except (TypeError, ValueError, configparser.NoOptionError):
exclude_links = False
run = True
oldnote = ""
base_url = "https://" + instance + "/api/v1/accounts/" + userid + "/statuses?limit=20&exclude_replies="\
+ str(not include_replies)
if init:
url = base_url
else:
url = base_url + "&since_id=" + since_id
while run:
if (init and len(notes_list) >= min_notes) or (oldnote == note_id):
break
try:
req = requests.get(url)
req.raise_for_status()
except requests.exceptions.HTTPError as err:
print("Couldn't get Posts! " + str(err))
sys.exit(1)
for jsonObj in req.json():
notes_list.append(jsonObj)
if len(notes_list) == 0:
print("No new notes to load!")
return []
oldnote = note_id
note_id = notes_list[len(notes_list)-1]["id"]
if init:
url = base_url + "&max_id=" + note_id
else:
url = base_url + "&since_id=" + since_id + "&max_id=" + note_id
print(str(len(notes_list)) + " Notes read.")
print("Processing notes...")
for element in notes_list:
last_time = element["created_at"]
last_timestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000)
content = element["content"]
if content == "" and element["reblog"] is None: # Skips empty notes
continue
elif content == "" and element["reblog"] is not None:
if include_my_renotes: # Add Renotes to Database (if wanted)
content = element["reblog"]["content"]
content = content.replace(chr(8203), "")
else:
continue
if element["spoiler_text"] != "" and exclude_nsfw:
continue
else:
content = element["spoiler_text"] + " " + content
content = regex.sub(r"<[^>]+>", '', content) # Remove HTML tags in Note
content = regex.sub(r"([.,!?])", r"\1 ", content) # Add spaces behind punctuation mark
content = regex.sub(r"\s{2,}", " ", content) # Remove double spaces
content = regex.sub(r"(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?", '', content) # Remove instance name with regular
# expression
content = content.replace("::", ": :") # Break long emoji chains
content = content.replace("@", "@" + chr(8203)) # Add no-length-space behind @
if exclude_links:
content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))",
"", content)
note_dict = {"id": element["id"], "text": content, "timestamp": last_timestamp, "user_id": userid}
return_list.append(note_dict)
return return_list
def pleroma_get_user_id(username: str, instance: str) -> str:
# Pleroma uses the Mastodon API so as a shortcut I just reuse the Mastodon function
return mastodon_get_user_id(username, instance)
def pleroma_get_notes(**kwargs):
return_list = []
username = kwargs["username"]
instance = kwargs["instance"]
if kwargs:
if "min_notes" in kwargs:
return_list = mastodon_get_notes(username=username, instance=instance, min_notes=kwargs["min_notes"])
elif "lastnote" in kwargs:
return_list = mastodon_get_notes(username=username, instance=instance, lastnote=kwargs["lastnote"])
else:
print("Wrong arguments given!")
print("Exiting routine!")
return
else:
print("No arguments given!")
print("Exiting routine")
return None
return return_list
def get_user_id(username: str, instance: str) -> str:
# Determine API endpoint
api = get_endpoint(instance)
# Determine how to get User ID on used Software
if api == "Misskey":
return misskey_get_user_id(username, instance)
elif api == "Mastodon":
return mastodon_get_user_id(username, instance)
elif api == "Pleroma":
return pleroma_get_user_id(username, instance)
else:
print("Domain isn't Misskey, Pleroma or Mastodon!\nCheck spelling of the domain!")
sys.exit(1)
def calculate_markov_chain():
text = ""
# Load configuration
@ -167,7 +382,7 @@ def calculate_markov_chain():
config.read(Path(__file__).parent.joinpath('bot.cfg'))
try:
max_notes = config.get("markov", "max_notes")
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
max_notes = "10000"
databasepath = Path(__file__).parent.joinpath('roboduck.db')
@ -218,10 +433,13 @@ def clean_database():
for user in config.get("misskey", "users").split(";"):
username = user.split("@")[1]
instance = user.split("@")[2]
userid = get_user_id(username, instance)
data = database.cursor()
data.execute(
"DELETE FROM notes WHERE user_id=:user_id AND id NOT IN (SELECT id FROM notes WHERE user_id=:user_id ORDER BY timestamp DESC LIMIT :max );",
"DELETE FROM notes WHERE user_id=:user_id AND id NOT IN (SELECT id FROM notes WHERE user_id=:user_id "
"ORDER BY timestamp DESC LIMIT :max );",
{"user_id": userid, "max": int(max_notes)})
database.commit()
@ -237,42 +455,42 @@ def create_sentence():
# Reading config file bot.cfg with config parser
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
# print((Path(__file__).parent).joinpath('bot.cfg'))
# Read & Sanitize Inputs
try:
test_output = check_str_to_bool(config.get("markov", "test_output"))
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
# print("test_output: " + str(err))
test_output = True
if test_output:
try:
tries = int(config.get("markov", "tries"))
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
# print("tries: " + str(err))
tries = 250
try:
max_overlap_ratio = float(config.get("markov", "max_overlap_ratio"))
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
# print("max_overlap_ratio: " + str(err))
max_overlap_ratio = 0.7
try:
max_overlap_total = int(config.get("markov", "max_overlap_total"))
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
# print("max_overlap_total: " + str(err))
max_overlap_total = 10
try:
max_words = int(config.get("markov", "max_words"))
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
# print("max_words: " + str(err))
max_words = None
try:
min_words = int(config.get("markov", "min_words"))
except (TypeError, ValueError):
except (TypeError, ValueError, configparser.NoOptionError):
# print("min_words: " + str(err))
min_words = None
@ -291,7 +509,7 @@ def create_sentence():
min_words = None
"""
#Debug section to rpint the used values
#Debug section to print the used values
print("These values are used:")
print("test_output: " + str(test_output))
print("tries: " + str(tries))
@ -317,7 +535,7 @@ def create_sentence():
def update():
notesList = []
notes_list = []
databasepath = Path(__file__).parent.joinpath('roboduck.db')
if not (os.path.exists(databasepath) and os.stat(databasepath).st_size != 0):
print("No database found!")
@ -326,7 +544,7 @@ def update():
with open(databasepath, "a", encoding="utf-8"):
database = sqlite3.connect(databasepath)
print("Connected to roboduck.db succesfull...")
print("Connected to roboduck.db successful...")
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
@ -336,20 +554,30 @@ def update():
userid = get_user_id(username, instance)
data = database.cursor()
data.execute(
"SELECT id FROM notes WHERE timestamp = (SELECT MAX(timestamp) FROM notes WHERE user_id=:user_id) AND user_id=:user_id;",
"SELECT id FROM notes WHERE timestamp = (SELECT MAX(timestamp) FROM notes WHERE user_id=:user_id) AND "
"user_id=:user_id;",
{"user_id": userid})
sinceNote = data.fetchone()[0]
since_note = data.fetchone()[0]
notesList.extend(get_notes(lastnote=sinceNote, username=username, instance=instance))
api = get_endpoint(instance)
if notesList == 0:
if api == "Misskey":
notes_list.extend(misskey_get_notes(lastnote=since_note, username=username, instance=instance))
elif api == "Mastodon":
notes_list.extend(mastodon_get_notes(lastnote=since_note, username=username, instance=instance))
elif api == "Pleroma":
notes_list.extend(pleroma_get_notes(lastnote=since_note, username=username, instance=instance))
else:
print("BIG ERROR!")
if notes_list == 0:
database.close()
return
print("Insert new notes to database...")
database.executemany("INSERT OR IGNORE INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?)",
[(note["id"], note["text"], note["timestamp"], note["user_id"]) for note in notesList])
[(note["id"], note["text"], note["timestamp"], note["user_id"]) for note in notes_list])
database.commit()
print("Notes updated!")
@ -359,7 +587,7 @@ def update():
clean_database()
print("Database cleaned!")
print("Short sleep to prevent file collison...")
print("Short sleep to prevent file collision...")
sleep(10)
print("Calculating new Markov Chain...")
@ -380,10 +608,10 @@ def init_bot():
with open(databasepath, "w+", encoding="utf-8"):
database = sqlite3.connect(databasepath)
print("Connected to roboduck.db succesfull...")
print("Connected to roboduck.db successful...")
print("Creating Table...")
database.execute("CREATE TABLE notes (id CHAR(10) PRIMARY KEY, text CHAR(5000), timestamp INT, user_id CHAR(10));")
database.execute("CREATE TABLE notes (id CHAR(20) PRIMARY KEY, text TEXT, timestamp INT, user_id CHAR(20));")
print("Table NOTES created...")
@ -391,20 +619,34 @@ def init_bot():
config = configparser.ConfigParser()
config.read(Path(__file__).parent.joinpath('bot.cfg'))
try:
initnotes = int(config.get("markov", "min_notes"))
init_notes = int(config.get("markov", "min_notes"))
except (TypeError, ValueError):
# print(err)
initnotes = 1000
init_notes = 1000
for user in config.get("misskey", "users").split(";"):
print("Try reading first " + str(initnotes) + " notes for " + user + ".")
print("Try reading first " + str(init_notes) + " notes for " + user + ".")
notesList = get_notes(min_notes=initnotes, username=user.split("@")[1], instance=user.split("@")[2])
username = user.split("@")[1]
instance = user.split("@")[2]
api = get_endpoint(instance)
print(instance + " is a " + api + " instance.")
if api == "Misskey":
notes_list = misskey_get_notes(min_notes=init_notes, username=username, instance=instance)
elif api == "Mastodon":
notes_list = mastodon_get_notes(min_notes=init_notes, username=username, instance=instance)
elif api == "Pleroma":
notes_list = pleroma_get_notes(min_notes=init_notes, username=username, instance=instance)
else:
print("BIG ERROR!")
print("Writing notes into database...")
database.executemany("INSERT INTO notes (id, text, timestamp, user_id) VALUES(?, ?, ?, ?)",
[(note["id"], note["text"], note["timestamp"], note["user_id"]) for note in notesList])
[(note["id"], note["text"], note["timestamp"], note["user_id"]) for note in notes_list])
database.commit()
database.close()

View file

@ -1,3 +1,3 @@
from roboduck import *
import roboduck
update()
roboduck.update()