113 lines
3.9 KiB
Python
113 lines
3.9 KiB
Python
import html
|
||
import re
|
||
import traceback
|
||
from datetime import datetime
|
||
from random import randint
|
||
|
||
from discord.ext import commands
|
||
from discord_slash import cog_ext
|
||
from discord_slash import SlashContext
|
||
|
||
from jarvis.db.models import Joke
|
||
from jarvis.utils import build_embed
|
||
from jarvis.utils.field import Field
|
||
|
||
|
||
class JokeCog(commands.Cog):
|
||
"""
|
||
Joke library for J.A.R.V.I.S.
|
||
|
||
May adapt over time to create jokes using machine learning
|
||
"""
|
||
|
||
def __init__(self, bot):
|
||
self.bot = bot
|
||
|
||
# TODO: Make this a command group with subcommands
|
||
@cog_ext.cog_slash(
|
||
name="joke",
|
||
description="Hear a joke",
|
||
)
|
||
@commands.cooldown(1, 10, commands.BucketType.channel)
|
||
async def _joke(self, ctx: SlashContext, id: str = None):
|
||
"""Get a joke from the database"""
|
||
try:
|
||
if randint(1, 100_000) == 5779 and id is None:
|
||
await ctx.send(f"<@{ctx.message.author.id}>")
|
||
return
|
||
# TODO: Add this as a parameter that can be passed in
|
||
threshold = 500 # Minimum score
|
||
result = None
|
||
if id:
|
||
result = Joke.objects(rid=id).first()
|
||
else:
|
||
pipeline = [
|
||
{"$match": {"score": {"$gt": threshold}}},
|
||
{"$sample": {"size": 1}},
|
||
]
|
||
result = Joke.objects().aggregate(pipeline).next()
|
||
while result["body"] in ["[removed]", "[deleted]"]:
|
||
result = Joke.objects().aggregate(pipeline).next()
|
||
|
||
if result is None:
|
||
await ctx.send("Humor module failed. Please try again later.", hidden=True)
|
||
return
|
||
emotes = re.findall(r"(&#x[a-fA-F0-9]*;)", result["body"])
|
||
for match in emotes:
|
||
result["body"] = result["body"].replace(match, html.unescape(match))
|
||
emotes = re.findall(r"(&#x[a-fA-F0-9]*;)", result["title"])
|
||
for match in emotes:
|
||
result["title"] = result["title"].replace(match, html.unescape(match))
|
||
body_chunks = []
|
||
|
||
body = ""
|
||
for word in result["body"].split(" "):
|
||
if len(body) + 1 + len(word) > 1024:
|
||
body_chunks.append(Field("", body, False))
|
||
body = ""
|
||
if word == "\n" and body == "":
|
||
continue
|
||
elif word == "\n":
|
||
body += word
|
||
else:
|
||
body += " " + word
|
||
|
||
desc = ""
|
||
title = result["title"]
|
||
if len(title) > 256:
|
||
new_title = ""
|
||
limit = False
|
||
for word in title.split(" "):
|
||
if len(new_title) + len(word) + 1 > 253 and not limit:
|
||
new_title += "..."
|
||
desc = "..."
|
||
limit = True
|
||
if not limit:
|
||
new_title += word + " "
|
||
else:
|
||
desc += word + " "
|
||
|
||
body_chunks.append(Field("", body, False))
|
||
|
||
fields = body_chunks
|
||
fields.append(Field("Score", result["score"]))
|
||
# Field(
|
||
# "Created At",
|
||
# str(datetime.fromtimestamp(result["created_utc"])),
|
||
# ),
|
||
fields.append(Field("ID", result["rid"]))
|
||
embed = build_embed(
|
||
title=title,
|
||
description=desc,
|
||
fields=fields,
|
||
url=f"https://reddit.com/r/jokes/comments/{result['rid']}",
|
||
timestamp=datetime.fromtimestamp(result["created_utc"]),
|
||
)
|
||
await ctx.send(embed=embed)
|
||
except Exception:
|
||
await ctx.send("Encountered error:\n```\n" + traceback.format_exc() + "\n```")
|
||
# await ctx.send(f"**{result['title']}**\n\n{result['body']}")
|
||
|
||
|
||
def setup(bot):
|
||
bot.add_cog(JokeCog(bot))
|