336 lines
12 KiB
Python
336 lines
12 KiB
Python
"""J.A.R.V.I.S. Remind Me Cog."""
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from typing import List
|
|
|
|
from bson import ObjectId
|
|
from dateparser import parse
|
|
from dateparser_data.settings import default_parsers
|
|
from dis_snek import InteractionContext, Scale, Snake
|
|
from dis_snek.client.utils.misc_utils import get
|
|
from dis_snek.models.discord.channel import GuildChannel
|
|
from dis_snek.models.discord.components import ActionRow, Select, SelectOption
|
|
from dis_snek.models.discord.embed import Embed, EmbedField
|
|
from dis_snek.models.discord.modal import InputText, Modal, TextStyles
|
|
from dis_snek.models.snek.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
slash_command,
|
|
slash_option,
|
|
)
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Reminder
|
|
|
|
from jarvis.utils import build_embed
|
|
|
|
valid = re.compile(r"[\w\s\-\\/.!@#$%^*()+=<>:,\u0080-\U000E0FFF]*")
|
|
time_pattern = re.compile(r"(\d+\.?\d?[s|m|h|d|w]{1})\s?", flags=re.IGNORECASE)
|
|
invites = re.compile(
|
|
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)", # noqa: E501
|
|
flags=re.IGNORECASE,
|
|
)
|
|
|
|
|
|
class RemindmeCog(Scale):
|
|
"""J.A.R.V.I.S. Remind Me Cog."""
|
|
|
|
def __init__(self, bot: Snake):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
@slash_command(name="remindme", description="Set a reminder")
|
|
@slash_option(
|
|
name="private",
|
|
description="Send as DM?",
|
|
opt_type=OptionTypes.BOOLEAN,
|
|
required=False,
|
|
)
|
|
async def _remindme(
|
|
self,
|
|
ctx: InteractionContext,
|
|
private: bool = False,
|
|
) -> None:
|
|
reminders = len([x async for x in Reminder.find(q(user=ctx.author.id, active=True))])
|
|
if reminders >= 5:
|
|
await ctx.send(
|
|
"You already have 5 (or more) active reminders. "
|
|
"Please either remove an old one, or wait for one to pass",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
modal = Modal(
|
|
title="Set your reminder!",
|
|
components=[
|
|
InputText(
|
|
label="What to remind you?",
|
|
placeholder="Reminder",
|
|
style=TextStyles.PARAGRAPH,
|
|
custom_id="message",
|
|
max_length=500,
|
|
),
|
|
InputText(
|
|
label="When to remind you?",
|
|
placeholder="1h 30m | in 5 minutes | November 11, 4011",
|
|
style=TextStyles.SHORT,
|
|
custom_id="delay",
|
|
),
|
|
],
|
|
)
|
|
|
|
await ctx.send_modal(modal)
|
|
try:
|
|
response = await self.bot.wait_for_modal(modal, author=ctx.author.id, timeout=60 * 5)
|
|
message = response.responses.get("message")
|
|
delay = response.responses.get("delay")
|
|
except asyncio.TimeoutError:
|
|
return
|
|
if len(message) > 500:
|
|
await response.send("Reminder cannot be > 500 characters.", ephemeral=True)
|
|
return
|
|
elif invites.search(message):
|
|
await response.send(
|
|
"Listen, don't use this to try and bypass the rules",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
elif not valid.fullmatch(message):
|
|
await response.send("Hey, you should probably make this readable", ephemeral=True)
|
|
return
|
|
|
|
base_settings = {
|
|
"PREFER_DATES_FROM": "future",
|
|
"TIMEZONE": "UTC",
|
|
"RETURN_AS_TIMEZONE_AWARE": True,
|
|
}
|
|
rt_settings = base_settings.copy()
|
|
rt_settings["PARSERS"] = [
|
|
x for x in default_parsers if x not in ["absolute-time", "timestamp"]
|
|
]
|
|
|
|
rt_remind_at = parse(delay, settings=rt_settings)
|
|
|
|
at_settings = base_settings.copy()
|
|
at_settings["PARSERS"] = [x for x in default_parsers if x != "relative-time"]
|
|
at_remind_at = parse(delay, settings=at_settings)
|
|
|
|
if rt_remind_at:
|
|
remind_at = rt_remind_at
|
|
elif at_remind_at:
|
|
remind_at = at_remind_at
|
|
else:
|
|
self.logger.debug(f"Failed to parse delay: {delay}")
|
|
await response.send(
|
|
f"`{delay}` is not a parsable date, please try again", ephemeral=True
|
|
)
|
|
return
|
|
|
|
if remind_at < datetime.now(tz=timezone.utc):
|
|
await response.send(
|
|
f"`{delay}` is in the past. Past reminders aren't allowed", ephemeral=True
|
|
)
|
|
return
|
|
|
|
elif remind_at < datetime.now(tz=timezone.utc):
|
|
pass
|
|
|
|
r = Reminder(
|
|
user=ctx.author.id,
|
|
channel=ctx.channel.id,
|
|
guild=ctx.guild.id,
|
|
message=message,
|
|
remind_at=remind_at,
|
|
private=private,
|
|
active=True,
|
|
)
|
|
|
|
await r.commit()
|
|
|
|
embed = build_embed(
|
|
title="Reminder Set",
|
|
description=f"{ctx.author.mention} set a reminder",
|
|
fields=[
|
|
EmbedField(name="Message", value=message),
|
|
EmbedField(
|
|
name="When",
|
|
value=f"<t:{int(remind_at.timestamp())}:F>",
|
|
inline=False,
|
|
),
|
|
],
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.username + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
|
|
await response.send(embed=embed, ephemeral=private)
|
|
|
|
async def get_reminders_embed(
|
|
self, ctx: InteractionContext, reminders: List[Reminder]
|
|
) -> Embed:
|
|
"""Build embed for paginator."""
|
|
fields = []
|
|
for reminder in reminders:
|
|
if reminder.private and isinstance(ctx.channel, GuildChannel):
|
|
fields.embed(
|
|
EmbedField(
|
|
name=f"<t:{int(reminder.remind_at.timestamp())}:F>",
|
|
value="Please DM me this command to view the content of this reminder",
|
|
inline=False,
|
|
)
|
|
)
|
|
else:
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"<t:{int(reminder.remind_at.timestamp())}:F>",
|
|
value=f"{reminder.message}\n\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
embed = build_embed(
|
|
title=f"{len(reminders)} Active Reminder(s)",
|
|
description=f"All active reminders for {ctx.author.mention}",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.username + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
|
|
return embed
|
|
|
|
reminders = SlashCommand(name="reminders", description="Manage reminders")
|
|
|
|
@reminders.subcommand(sub_cmd_name="list", sub_cmd_description="List reminders")
|
|
async def _list(self, ctx: InteractionContext) -> None:
|
|
reminders = await Reminder.find(q(user=ctx.author.id, active=True)).to_list(None)
|
|
if not reminders:
|
|
await ctx.send("You have no reminders set.", ephemeral=True)
|
|
return
|
|
|
|
embed = await self.get_reminders_embed(ctx, reminders)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@reminders.subcommand(sub_cmd_name="delete", sub_cmd_description="Delete a reminder")
|
|
async def _delete(self, ctx: InteractionContext) -> None:
|
|
reminders = await Reminder.find(q(user=ctx.author.id, active=True)).to_list(None)
|
|
if not reminders:
|
|
await ctx.send("You have no reminders set", ephemeral=True)
|
|
return
|
|
|
|
options = []
|
|
for reminder in reminders:
|
|
option = SelectOption(
|
|
label=f"{reminder.remind_at}",
|
|
value=str(reminder.id),
|
|
emoji="⏰",
|
|
)
|
|
options.append(option)
|
|
|
|
select = Select(
|
|
options=options,
|
|
custom_id="to_delete",
|
|
placeholder="Select reminders to delete",
|
|
min_values=1,
|
|
max_values=len(reminders),
|
|
)
|
|
|
|
components = [ActionRow(select)]
|
|
embed = await self.get_reminders_embed(ctx, reminders)
|
|
message = await ctx.send(
|
|
content=f"You have {len(reminders)} reminder(s) set:",
|
|
embed=embed,
|
|
components=components,
|
|
)
|
|
|
|
try:
|
|
context = await self.bot.wait_for_component(
|
|
check=lambda x: ctx.author.id == x.context.author.id,
|
|
messages=message,
|
|
timeout=60 * 5,
|
|
)
|
|
|
|
fields = []
|
|
for to_delete in context.context.values:
|
|
reminder = get(reminders, user=ctx.author.id, id=ObjectId(to_delete))
|
|
if reminder.private and isinstance(ctx.channel, GuildChannel):
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"<t:{int(reminder.remind_at.timestamp())}:F>",
|
|
value="Private reminder",
|
|
inline=False,
|
|
)
|
|
)
|
|
else:
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"<t:{int(reminder.remind_at.timestamp())}:F>",
|
|
value=reminder.message,
|
|
inline=False,
|
|
)
|
|
)
|
|
try:
|
|
await reminder.delete()
|
|
except Exception:
|
|
pass # Silently drop error
|
|
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
|
|
embed = build_embed(
|
|
title="Deleted Reminder(s)",
|
|
description="",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.display_name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.display_avatar.url)
|
|
|
|
await context.context.edit_origin(
|
|
content=f"Deleted {len(context.context.values)} reminder(s)",
|
|
components=components,
|
|
embed=embed,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
await message.edit(components=components)
|
|
|
|
@reminders.subcommand(
|
|
sub_cmd_name="fetch",
|
|
sub_cmd_description="Fetch a reminder that failed to send",
|
|
)
|
|
@slash_option(
|
|
name="id", description="ID of the reminder", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
async def _fetch(self, ctx: InteractionContext, id: str) -> None:
|
|
reminder = await Reminder.find_one(q(id=id))
|
|
if not reminder:
|
|
await ctx.send(f"Reminder {id} does not exist")
|
|
return
|
|
|
|
embed = build_embed(title="You have a reminder!", description=reminder.message, fields=[])
|
|
embed.set_author(
|
|
name=ctx.author.display_name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.display_avatar,
|
|
)
|
|
|
|
embed.set_thumbnail(url=ctx.author.display_avatar)
|
|
await ctx.send(embed=embed, ephemeral=reminder.private)
|
|
|
|
|
|
def setup(bot: Snake) -> None:
|
|
"""Add RemindmeCog to J.A.R.V.I.S."""
|
|
RemindmeCog(bot)
|