mirror of
https://github.com/fotoente/MIsskey-ebooks-bot
synced 2024-11-22 03:13:06 +00:00
Implemented reading from Mastodon & Pleroma
This commit is contained in:
parent
424ebe43f6
commit
b9868b7fba
2 changed files with 189 additions and 14 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -5,3 +5,4 @@ bot.cfg
|
|||
__pycache__
|
||||
mia-markov-chain
|
||||
.venv
|
||||
notes.txt
|
198
roboduck.py
198
roboduck.py
|
@ -24,7 +24,7 @@ def get_endpoint(instance: str) -> str:
|
|||
# Try Misskey
|
||||
url = "https://" + instance + "/api/ping"
|
||||
req = requests.post(url)
|
||||
if req.status_code == 200:
|
||||
if req.status_code == 200 and ("pong" in req.json()):
|
||||
return "Misskey"
|
||||
|
||||
# Try Mastodon and Pleroma
|
||||
|
@ -195,8 +195,142 @@ def mastodon_get_user_id(username: str, instance: str) -> str:
|
|||
return req.json()["id"]
|
||||
|
||||
|
||||
def mastodon_get_notes():
|
||||
print("MASTODON'T NOTES!") # TODO Write routine to get Mastodon notes (check for limiting commands!)
|
||||
def mastodon_get_notes(**kwargs):
|
||||
note_id = "k"
|
||||
since_id = ""
|
||||
min_notes = 0
|
||||
notes_list = []
|
||||
return_list = []
|
||||
username = kwargs["username"]
|
||||
instance = kwargs["instance"]
|
||||
|
||||
print("Reading notes for @" + username + "@" + instance + ".")
|
||||
if kwargs:
|
||||
if "min_notes" in kwargs:
|
||||
# print("min_notes found!")
|
||||
init = True
|
||||
min_notes = kwargs["min_notes"]
|
||||
|
||||
elif "lastnote" in kwargs:
|
||||
# print("Lastnote found!")
|
||||
init = False
|
||||
since_id = kwargs["lastnote"]
|
||||
|
||||
else:
|
||||
print("Wrong arguments given!")
|
||||
print("Exiting routine!")
|
||||
return
|
||||
else:
|
||||
print("No arguments given!")
|
||||
print("Exiting routine")
|
||||
return None
|
||||
|
||||
# Load configuration
|
||||
config = configparser.ConfigParser()
|
||||
config.read(os.path.join(os.path.dirname(__file__), 'bot.cfg'))
|
||||
|
||||
userid = mastodon_get_user_id(username, instance) # Here are only Mastodon ID is necessary so no need to check
|
||||
# endpoint again
|
||||
|
||||
# Read & Sanitize Inputs from Config File
|
||||
try:
|
||||
include_replies = check_str_to_bool(config.get("markov", "includeReplies"))
|
||||
except (TypeError, ValueError, configparser.NoOptionError):
|
||||
include_replies = True
|
||||
|
||||
try:
|
||||
include_my_renotes = check_str_to_bool(config.get("markov", "includeMyRenotes"))
|
||||
except (TypeError, ValueError, configparser.NoOptionError):
|
||||
include_my_renotes = False
|
||||
|
||||
try:
|
||||
exclude_nsfw = check_str_to_bool(config.get("markov", "excludeNsfw"))
|
||||
except (TypeError, ValueError, configparser.NoOptionError):
|
||||
exclude_nsfw = True
|
||||
|
||||
try:
|
||||
exclude_links = check_str_to_bool(config.get("markov", "exclude_links"))
|
||||
except (TypeError, ValueError, configparser.NoOptionError):
|
||||
exclude_links = False
|
||||
|
||||
run = True
|
||||
oldnote = ""
|
||||
|
||||
base_url = "https://" + instance + "/api/v1/accounts/" + userid + "/statuses?limit=20&exclude_replies="\
|
||||
+ str(not include_replies)
|
||||
|
||||
if init:
|
||||
url = base_url
|
||||
else:
|
||||
url = base_url + "&since_id=" + since_id
|
||||
|
||||
while run:
|
||||
|
||||
if (init and len(notes_list) >= min_notes) or (oldnote == note_id):
|
||||
break
|
||||
|
||||
try:
|
||||
req = requests.get(url)
|
||||
req.raise_for_status()
|
||||
except requests.exceptions.HTTPError as err:
|
||||
print("Couldn't get Posts! " + str(err))
|
||||
sys.exit(1)
|
||||
|
||||
for jsonObj in req.json():
|
||||
notes_list.append(jsonObj)
|
||||
if len(notes_list) == 0:
|
||||
print("No new notes to load!")
|
||||
return []
|
||||
|
||||
oldnote = note_id
|
||||
|
||||
note_id = notes_list[len(notes_list)-1]["id"]
|
||||
|
||||
if init:
|
||||
url = base_url + "&max_id=" + note_id
|
||||
else:
|
||||
url = base_url + "&since_id=" + since_id + "&max_id=" + note_id
|
||||
|
||||
print(str(len(notes_list)) + " Notes read.")
|
||||
print("Processing notes...")
|
||||
|
||||
for element in notes_list:
|
||||
last_time = element["created_at"]
|
||||
last_timestamp = int(datetime.timestamp(datetime.strptime(last_time, '%Y-%m-%dT%H:%M:%S.%f%z')) * 1000)
|
||||
|
||||
content = element["content"]
|
||||
|
||||
if content == "" and element["reblog"] is None: # Skips empty notes
|
||||
continue
|
||||
elif content == "" and element["reblog"] is not None:
|
||||
if include_my_renotes: # Add Renotes to Database (if wanted)
|
||||
content = element["reblog"]["content"]
|
||||
content = content.replace(chr(8203), "")
|
||||
else:
|
||||
continue
|
||||
|
||||
if element["spoiler_text"] != "" and exclude_nsfw:
|
||||
continue
|
||||
else:
|
||||
content = element["spoiler_text"] + " " + content
|
||||
|
||||
content = regex.sub(r"<[^>]+>", '', content) # Remove HTML tags in Note
|
||||
|
||||
content = regex.sub(r"([.,!?])", r"\1 ", content) # Add spaces behind punctuation mark
|
||||
content = regex.sub(r"\s{2,}", " ", content) # Remove double spaces
|
||||
content = regex.sub(r"(?>@(?>[\w\-])+)(?>@(?>[\w\-\.])+)?", '', content) # Remove instance name with regular
|
||||
# expression
|
||||
content = content.replace("::", ": :") # Break long emoji chains
|
||||
content = content.replace("@", "@" + chr(8203)) # Add no-length-space behind @
|
||||
|
||||
if exclude_links:
|
||||
content = regex.sub(r"(http|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-]))",
|
||||
"", content)
|
||||
|
||||
note_dict = {"id": element["id"], "text": content, "timestamp": last_timestamp, "user_id": userid}
|
||||
return_list.append(note_dict)
|
||||
|
||||
return return_list
|
||||
|
||||
|
||||
def pleroma_get_user_id(username: str, instance: str) -> str:
|
||||
|
@ -204,9 +338,26 @@ def pleroma_get_user_id(username: str, instance: str) -> str:
|
|||
return mastodon_get_user_id(username, instance)
|
||||
|
||||
|
||||
def pleroma_get_notes():
|
||||
print("Pleroma notes!") # TODO Write routine to get Pleroma notes (check for limiting commands)
|
||||
def pleroma_get_notes(**kwargs):
|
||||
return_list = []
|
||||
username = kwargs["username"]
|
||||
instance = kwargs["instance"]
|
||||
|
||||
if kwargs:
|
||||
if "min_notes" in kwargs:
|
||||
return_list = mastodon_get_notes(username=username, instance=instance, min_notes=kwargs["min_notes"])
|
||||
elif "lastnote" in kwargs:
|
||||
return_list = mastodon_get_notes(username=username, instance=instance, lastnote=kwargs["lastnote"])
|
||||
else:
|
||||
print("Wrong arguments given!")
|
||||
print("Exiting routine!")
|
||||
return
|
||||
else:
|
||||
print("No arguments given!")
|
||||
print("Exiting routine")
|
||||
return None
|
||||
|
||||
return return_list
|
||||
|
||||
def get_user_id(username: str, instance: str) -> str:
|
||||
# Determine API endpoint
|
||||
|
@ -393,7 +544,7 @@ def update():
|
|||
|
||||
with open(databasepath, "a", encoding="utf-8"):
|
||||
database = sqlite3.connect(databasepath)
|
||||
print("Connected to roboduck.db succesfull...")
|
||||
print("Connected to roboduck.db successful...")
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read(Path(__file__).parent.joinpath('bot.cfg'))
|
||||
|
@ -409,7 +560,16 @@ def update():
|
|||
|
||||
since_note = data.fetchone()[0]
|
||||
|
||||
api = get_endpoint(instance)
|
||||
|
||||
if api == "Misskey":
|
||||
notes_list.extend(misskey_get_notes(lastnote=since_note, username=username, instance=instance))
|
||||
elif api == "Mastodon":
|
||||
notes_list.extend(mastodon_get_notes(lastnote=since_note, username=username, instance=instance))
|
||||
elif api == "Pleroma":
|
||||
notes_list.extend(pleroma_get_notes(lastnote=since_note, username=username, instance=instance))
|
||||
else:
|
||||
print("BIG ERROR!")
|
||||
|
||||
if notes_list == 0:
|
||||
database.close()
|
||||
|
@ -427,7 +587,7 @@ def update():
|
|||
clean_database()
|
||||
print("Database cleaned!")
|
||||
|
||||
print("Short sleep to prevent file collison...")
|
||||
print("Short sleep to prevent file collision...")
|
||||
sleep(10)
|
||||
|
||||
print("Calculating new Markov Chain...")
|
||||
|
@ -451,7 +611,7 @@ def init_bot():
|
|||
print("Connected to roboduck.db successful...")
|
||||
|
||||
print("Creating Table...")
|
||||
database.execute("CREATE TABLE notes (id CHAR(20) PRIMARY KEY, text CHAR(5000), timestamp INT, user_id CHAR(10));")
|
||||
database.execute("CREATE TABLE notes (id CHAR(20) PRIMARY KEY, text TEXT, timestamp INT, user_id CHAR(20));")
|
||||
|
||||
print("Table NOTES created...")
|
||||
|
||||
|
@ -459,15 +619,29 @@ def init_bot():
|
|||
config = configparser.ConfigParser()
|
||||
config.read(Path(__file__).parent.joinpath('bot.cfg'))
|
||||
try:
|
||||
initnotes = int(config.get("markov", "min_notes"))
|
||||
init_notes = int(config.get("markov", "min_notes"))
|
||||
except (TypeError, ValueError):
|
||||
# print(err)
|
||||
initnotes = 1000
|
||||
init_notes = 1000
|
||||
|
||||
for user in config.get("misskey", "users").split(";"):
|
||||
print("Try reading first " + str(initnotes) + " notes for " + user + ".")
|
||||
print("Try reading first " + str(init_notes) + " notes for " + user + ".")
|
||||
|
||||
notes_list = misskey_get_notes(min_notes=initnotes, username=user.split("@")[1], instance=user.split("@")[2])
|
||||
username = user.split("@")[1]
|
||||
instance = user.split("@")[2]
|
||||
|
||||
api = get_endpoint(instance)
|
||||
|
||||
print(instance + " is a " + api + " instance.")
|
||||
|
||||
if api == "Misskey":
|
||||
notes_list = misskey_get_notes(min_notes=init_notes, username=username, instance=instance)
|
||||
elif api == "Mastodon":
|
||||
notes_list = mastodon_get_notes(min_notes=init_notes, username=username, instance=instance)
|
||||
elif api == "Pleroma":
|
||||
notes_list = pleroma_get_notes(min_notes=init_notes, username=username, instance=instance)
|
||||
else:
|
||||
print("BIG ERROR!")
|
||||
|
||||
print("Writing notes into database...")
|
||||
|
||||
|
|
Loading…
Reference in a new issue