322 lines
12 KiB
Python
322 lines
12 KiB
Python
"""JARVIS Remind Me Cog."""
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import List
|
|
|
|
from dateparser import parse
|
|
from dateparser_data.settings import default_parsers
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Reminder
|
|
from naff import AutocompleteContext, Client, Extension, InteractionContext
|
|
from naff.models.discord.channel import GuildChannel
|
|
from naff.models.discord.components import ActionRow, Button
|
|
from naff.models.discord.embed import Embed, EmbedField
|
|
from naff.models.discord.enums import ButtonStyles
|
|
from naff.models.discord.modal import InputText, Modal, TextStyles
|
|
from naff.models.naff.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
slash_option,
|
|
)
|
|
from thefuzz import process
|
|
|
|
from jarvis.utils import build_embed
|
|
|
|
valid = re.compile(r"[\w\s\-\\/.!@#$%^*()+=<>:'\",\u0080-\U000E0FFF]*")
|
|
time_pattern = re.compile(r"(\d+\.?\d?[s|m|h|d|w]{1})\s?", flags=re.IGNORECASE)
|
|
invites = re.compile(
|
|
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)", # noqa: E501
|
|
flags=re.IGNORECASE,
|
|
)
|
|
|
|
|
|
class RemindmeCog(Extension):
|
|
"""JARVIS Remind Me Cog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
reminders = SlashCommand(name="reminders", description="Manage reminders")
|
|
|
|
@reminders.subcommand(sub_cmd_name="set", sub_cmd_description="Set a reminder")
|
|
@slash_option(
|
|
name="private",
|
|
description="Send as DM?",
|
|
opt_type=OptionTypes.BOOLEAN,
|
|
required=False,
|
|
)
|
|
async def _remindme(
|
|
self,
|
|
ctx: InteractionContext,
|
|
private: bool = None,
|
|
) -> None:
|
|
if private is None and ctx.guild:
|
|
private = ctx.guild.member_count >= 5000
|
|
modal = Modal(
|
|
title="Set your reminder!",
|
|
components=[
|
|
InputText(
|
|
label="What to remind you?",
|
|
placeholder="Reminder",
|
|
style=TextStyles.PARAGRAPH,
|
|
custom_id="message",
|
|
max_length=500,
|
|
),
|
|
InputText(
|
|
label="When to remind you?",
|
|
placeholder="1h 30m | in 5 minutes | November 11, 4011",
|
|
style=TextStyles.SHORT,
|
|
custom_id="delay",
|
|
),
|
|
],
|
|
)
|
|
|
|
await ctx.send_modal(modal)
|
|
try:
|
|
response = await self.bot.wait_for_modal(modal, author=ctx.author.id, timeout=60 * 5)
|
|
message = response.responses.get("message").strip()
|
|
delay = response.responses.get("delay").strip()
|
|
except asyncio.TimeoutError:
|
|
return
|
|
if len(message) > 500:
|
|
await response.send("Reminder cannot be > 500 characters.", ephemeral=True)
|
|
return
|
|
elif invites.search(message):
|
|
await response.send(
|
|
"Listen, don't use this to try and bypass the rules",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
elif not valid.fullmatch(message):
|
|
await response.send("Hey, you should probably make this readable", ephemeral=True)
|
|
return
|
|
elif len(message) == 0:
|
|
await response.send(
|
|
"Hey, you should probably add content to your reminder", ephemeral=True
|
|
)
|
|
return
|
|
|
|
base_settings = {
|
|
"PREFER_DATES_FROM": "future",
|
|
"TIMEZONE": "UTC",
|
|
"RETURN_AS_TIMEZONE_AWARE": True,
|
|
}
|
|
rt_settings = base_settings.copy()
|
|
rt_settings["PARSERS"] = [
|
|
x for x in default_parsers if x not in ["absolute-time", "timestamp"]
|
|
]
|
|
|
|
rt_remind_at = parse(delay, settings=rt_settings)
|
|
|
|
at_settings = base_settings.copy()
|
|
at_settings["PARSERS"] = [x for x in default_parsers if x != "relative-time"]
|
|
at_remind_at = parse(delay, settings=at_settings)
|
|
|
|
if rt_remind_at:
|
|
remind_at = rt_remind_at
|
|
elif at_remind_at:
|
|
remind_at = at_remind_at
|
|
else:
|
|
self.logger.debug(f"Failed to parse delay: {delay}")
|
|
await response.send(
|
|
f"`{delay}` is not a parsable date, please try again", ephemeral=True
|
|
)
|
|
return
|
|
|
|
if remind_at < datetime.now(tz=timezone.utc):
|
|
await response.send(
|
|
f"`{delay}` is in the past. Past reminders aren't allowed", ephemeral=True
|
|
)
|
|
return
|
|
|
|
elif remind_at < datetime.now(tz=timezone.utc):
|
|
pass
|
|
|
|
r = Reminder(
|
|
user=ctx.author.id,
|
|
channel=ctx.channel.id,
|
|
guild=ctx.guild.id if ctx.guild else ctx.author.id,
|
|
message=message,
|
|
remind_at=remind_at,
|
|
private=private,
|
|
active=True,
|
|
)
|
|
|
|
await r.commit()
|
|
|
|
embed = build_embed(
|
|
title="Reminder Set",
|
|
description=f"{ctx.author.mention} set a reminder",
|
|
fields=[
|
|
EmbedField(name="Message", value=message),
|
|
EmbedField(
|
|
name="When",
|
|
value=f"<t:{int(remind_at.timestamp())}:F> (<t:{int(remind_at.timestamp())}:R>)",
|
|
inline=False,
|
|
),
|
|
],
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.username + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
|
|
delete_button = Button(
|
|
style=ButtonStyles.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
|
|
)
|
|
copy_button = Button(style=ButtonStyles.GREEN, emoji="📋", custom_id=f"copy|rme|{r.id}")
|
|
components = [ActionRow(delete_button, copy_button)]
|
|
private = private if private is not None else False
|
|
await response.send(embeds=embed, components=components, ephemeral=private)
|
|
|
|
async def get_reminders_embed(
|
|
self, ctx: InteractionContext, reminders: List[Reminder]
|
|
) -> Embed:
|
|
"""Build embed for paginator."""
|
|
fields = []
|
|
for reminder in reminders:
|
|
if reminder.private and isinstance(ctx.channel, GuildChannel):
|
|
fields.embed(
|
|
EmbedField(
|
|
name=f"<t:{int(reminder.remind_at.timestamp())}:F> (<t:{int(reminder.remind_at.timestamp())}:R>)",
|
|
value="Please DM me this command to view the content of this reminder",
|
|
inline=False,
|
|
)
|
|
)
|
|
else:
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"<t:{int(reminder.remind_at.timestamp())}:F> (<t:{int(reminder.remind_at.timestamp())}:R>)",
|
|
value=f"{reminder.message}\n\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
embed = build_embed(
|
|
title=f"{len(reminders)} Active Reminder(s)",
|
|
description=f"All active reminders for {ctx.author.mention}",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.username + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
|
|
return embed
|
|
|
|
@reminders.subcommand(sub_cmd_name="list", sub_cmd_description="List reminders")
|
|
async def _list(self, ctx: InteractionContext) -> None:
|
|
reminders = await Reminder.find(q(user=ctx.author.id, active=True)).to_list(None)
|
|
if not reminders:
|
|
await ctx.send("You have no reminders set.", ephemeral=True)
|
|
return
|
|
|
|
embed = await self.get_reminders_embed(ctx, reminders)
|
|
components = Button(
|
|
style=ButtonStyles.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
|
|
)
|
|
await ctx.send(embeds=embed, components=components)
|
|
|
|
@reminders.subcommand(sub_cmd_name="delete", sub_cmd_description="Delete a reminder")
|
|
@slash_option(
|
|
name="content",
|
|
description="Content of the reminder",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
autocomplete=True,
|
|
)
|
|
async def _delete(self, ctx: InteractionContext, content: str) -> None:
|
|
reminder = await Reminder.find_one(q(_id=content))
|
|
if not reminder:
|
|
await ctx.send(f"Reminder `{content}` does not exist", ephemeral=True)
|
|
return
|
|
|
|
ts = int(reminder.remind_at.timestamp())
|
|
|
|
fields = [EmbedField(name=f"<t:{ts}:F>", value=reminder.message, inline=False)]
|
|
|
|
embed = build_embed(
|
|
title="Deleted Reminder(s)",
|
|
description="",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.display_name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
|
|
components = Button(
|
|
style=ButtonStyles.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
|
|
)
|
|
try:
|
|
await reminder.delete()
|
|
except Exception:
|
|
self.logger.debug("Ignoring deletion error")
|
|
await ctx.send(embeds=embed, ephemeral=reminder.private, components=components)
|
|
|
|
@reminders.subcommand(
|
|
sub_cmd_name="fetch",
|
|
sub_cmd_description="Fetch a reminder that failed to send",
|
|
)
|
|
@slash_option(
|
|
name="content",
|
|
description="Content of the reminder",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
autocomplete=True,
|
|
)
|
|
async def _fetch(self, ctx: InteractionContext, content: str) -> None:
|
|
reminder = await Reminder.find_one(q(_id=content))
|
|
if not reminder:
|
|
await ctx.send(f"Reminder `{content}` does not exist", ephemeral=True)
|
|
return
|
|
|
|
ts = int(reminder.remind_at.timestamp())
|
|
cts = int(reminder.created_at.timestamp())
|
|
|
|
fields = [
|
|
EmbedField(name="Remind At", value=f"<t:{ts}:F> (<t:{ts}:R>)"),
|
|
EmbedField(name="Created At", value=f"<t:{cts}:F> (<t:{cts}:R>)"),
|
|
]
|
|
|
|
embed = build_embed(
|
|
title="You have a reminder!", description=reminder.message, fields=fields
|
|
)
|
|
embed.set_author(
|
|
name=ctx.author.display_name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
components = Button(
|
|
style=ButtonStyles.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}"
|
|
)
|
|
await ctx.send(embeds=embed, ephemeral=reminder.private, components=components)
|
|
if reminder.remind_at <= datetime.now(tz=timezone.utc) and not reminder.active:
|
|
try:
|
|
await reminder.delete()
|
|
except Exception:
|
|
self.logger.debug("Ignoring deletion error")
|
|
|
|
@_fetch.autocomplete("content")
|
|
@_delete.autocomplete("content")
|
|
async def _search_reminders(self, ctx: AutocompleteContext, content: str) -> None:
|
|
reminders = await Reminder.find(q(user=ctx.author.id)).to_list(None)
|
|
lookup = {r.message: str(r.id) for r in reminders}
|
|
results = process.extract(content, list(lookup.keys()), limit=5)
|
|
choices = [{"name": r[0], "value": lookup[r[0]]} for r in results]
|
|
await ctx.send(choices=choices)
|
|
|
|
|
|
def setup(bot: Client) -> None:
|
|
"""Add RemindmeCog to JARVIS"""
|
|
RemindmeCog(bot)
|