365 lines
11 KiB
Python
365 lines
11 KiB
Python
import asyncio
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from bson import ObjectId
|
|
from ButtonPaginator import Paginator
|
|
from discord.ext import commands
|
|
from discord.ext.tasks import loop
|
|
from discord.utils import find
|
|
from discord_slash import SlashContext, cog_ext
|
|
from discord_slash.model import ButtonStyle
|
|
from discord_slash.utils.manage_commands import create_option
|
|
from discord_slash.utils.manage_components import (
|
|
create_actionrow,
|
|
create_select,
|
|
create_select_option,
|
|
wait_for_component,
|
|
)
|
|
|
|
from jarvis.db.types import Reminder
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.field import Field
|
|
|
|
valid = re.compile(r"[\w\s\-\\/.!@#$%^*()+=<>,\u0080-\U000E0FFF]*")
|
|
invites = re.compile(
|
|
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
|
|
flags=re.IGNORECASE,
|
|
)
|
|
|
|
|
|
class RemindmeCog(commands.Cog):
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.cache = {}
|
|
self._remind.start()
|
|
|
|
def check_cache(self, ctx: SlashContext, **kwargs):
|
|
if not kwargs:
|
|
kwargs = {}
|
|
return find(
|
|
lambda x: x["command"] == ctx.subcommand_name
|
|
and x["user"] == ctx.author.id
|
|
and x["guild"] == ctx.guild.id
|
|
and all(x[k] == v for k, v in kwargs.items()),
|
|
self.cache.values(),
|
|
)
|
|
|
|
@cog_ext.cog_slash(
|
|
name="remindme",
|
|
description="Set a reminder",
|
|
options=[
|
|
create_option(
|
|
name="message",
|
|
description="What to remind you of",
|
|
option_type=3,
|
|
required=True,
|
|
),
|
|
create_option(
|
|
name="weeks",
|
|
description="Number of weeks?",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
create_option(
|
|
name="days",
|
|
description="Number of days?",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
create_option(
|
|
name="hours",
|
|
description="Number of hours?",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
create_option(
|
|
name="minutes",
|
|
description="Number of minutes?",
|
|
option_type=4,
|
|
required=False,
|
|
),
|
|
],
|
|
)
|
|
async def _remindme(
|
|
self,
|
|
ctx: SlashContext,
|
|
message: Optional[str] = None,
|
|
weeks: Optional[int] = 0,
|
|
days: Optional[int] = 0,
|
|
hours: Optional[int] = 0,
|
|
minutes: Optional[int] = 0,
|
|
):
|
|
if len(message) > 100:
|
|
await ctx.send("Reminder cannot be > 100 characters.", hidden=True)
|
|
return
|
|
elif invites.search(message):
|
|
await ctx.send(
|
|
"Listen, don't use this to try and bypass the rules",
|
|
hidden=True,
|
|
)
|
|
return
|
|
elif not valid.fullmatch(message):
|
|
await ctx.send(
|
|
"Hey, you should probably make this readable", hidden=True
|
|
)
|
|
return
|
|
|
|
if not any([weeks, days, hours, minutes]):
|
|
await ctx.send("At least one time period is required", hidden=True)
|
|
return
|
|
|
|
weeks = abs(weeks)
|
|
days = abs(days)
|
|
hours = abs(hours)
|
|
minutes = abs(minutes)
|
|
|
|
if weeks and weeks > 4:
|
|
await ctx.send("Cannot be farther than 4 weeks out!", hidden=True)
|
|
return
|
|
|
|
elif days and days > 6:
|
|
await ctx.send(
|
|
"Use weeks instead of 7+ days, please.", hidden=True
|
|
)
|
|
return
|
|
|
|
elif hours and hours > 23:
|
|
await ctx.send(
|
|
"Use days instead of 24+ hours, please.", hidden=True
|
|
)
|
|
return
|
|
|
|
elif minutes and minutes > 59:
|
|
await ctx.send(
|
|
"Use hours instead of 59+ minutes, please.", hidden=True
|
|
)
|
|
return
|
|
|
|
reminders = Reminder.get_active(user=ctx.author.id)
|
|
if len(reminders) >= 5:
|
|
await ctx.send(
|
|
"You already have 5 (or more) active reminders. "
|
|
+ "Please either remove an old one, or wait for one to pass",
|
|
hidden=True,
|
|
)
|
|
return
|
|
|
|
remind_at = datetime.utcnow() + timedelta(
|
|
weeks=weeks,
|
|
days=days,
|
|
hours=hours,
|
|
minutes=minutes,
|
|
)
|
|
|
|
_ = Reminder(
|
|
user=ctx.author_id,
|
|
channel=ctx.channel.id,
|
|
guild=ctx.guild.id,
|
|
message=message,
|
|
remind_at=remind_at,
|
|
active=True,
|
|
).insert()
|
|
|
|
embed = build_embed(
|
|
title="Reminder Set",
|
|
description=f"{ctx.author.mention} set a reminder",
|
|
fields=[
|
|
Field(name="Message", value=message),
|
|
Field(
|
|
name="When",
|
|
value=remind_at.strftime("%Y-%m-%d %H:%M UTC"),
|
|
inline=False,
|
|
),
|
|
],
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.avatar_url)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
async def get_reminders_embed(self, ctx, reminders):
|
|
fields = []
|
|
for reminder in reminders:
|
|
fields.append(
|
|
Field(
|
|
name=reminder.remind_at.strftime("%Y-%m-%d %H:%M UTC"),
|
|
value=f"{reminder.message}\n\u200b",
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
embed = build_embed(
|
|
title=f"{len(reminders)} Active Reminder(s)",
|
|
description="All active reminders for " + f"{ctx.author.mention}",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.avatar_url)
|
|
|
|
return embed
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="reminders",
|
|
name="list",
|
|
description="List reminders for a user",
|
|
)
|
|
async def _list(self, ctx: SlashContext):
|
|
exists = self.check_cache(ctx)
|
|
if exists:
|
|
await ctx.defer(hidden=True)
|
|
await ctx.send(
|
|
"Please use existing interaction: "
|
|
+ f"{exists['paginator']._message.jump_url}",
|
|
hidden=True,
|
|
)
|
|
return
|
|
reminders = Reminder.get_active(user=ctx.author.id)
|
|
if not reminders:
|
|
await ctx.send("You have no reminders set.", hidden=True)
|
|
return
|
|
|
|
embed = await self.get_reminders_embed(ctx, reminders)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="reminders",
|
|
name="delete",
|
|
description="Delete a reminder",
|
|
)
|
|
async def _delete(self, ctx: SlashContext):
|
|
reminders = Reminder.get_active(user=ctx.author.id)
|
|
if not reminders:
|
|
await ctx.send("You have no reminders set", hidden=True)
|
|
return
|
|
|
|
options = []
|
|
for reminder in reminders:
|
|
option = create_select_option(
|
|
label=reminder.remind_at.strftime("%Y-%m-%d %H:%M UTC"),
|
|
value=str(reminder._id),
|
|
emoji="⏰",
|
|
)
|
|
options.append(option)
|
|
|
|
select = create_select(
|
|
options=options,
|
|
custom_id="to_delete",
|
|
placeholder="Select reminders to delete",
|
|
min_values=1,
|
|
max_values=len(reminders),
|
|
)
|
|
|
|
components = [create_actionrow(select)]
|
|
embed = await self.get_reminders_embed(ctx, reminders)
|
|
message = await ctx.send(
|
|
content=f"You have {len(reminders)} reminder(s) set:",
|
|
embed=embed,
|
|
components=components,
|
|
)
|
|
|
|
try:
|
|
context = await wait_for_component(
|
|
self.bot,
|
|
check=lambda x: ctx.author.id == x.author_id,
|
|
messages=message,
|
|
timeout=60 * 5,
|
|
)
|
|
for to_delete in context.selected_options:
|
|
_ = Reminder.get(
|
|
user=ctx.author.id, _id=ObjectId(to_delete)
|
|
).delete()
|
|
|
|
for row in components:
|
|
for component in row["components"]:
|
|
component["disabled"] = True
|
|
|
|
fields = []
|
|
for reminder in filter(
|
|
lambda x: str(x._id) in context.selected_options, reminders
|
|
):
|
|
fields.append(
|
|
Field(
|
|
name=reminder.remind_at.strftime("%Y-%m-%d %H:%M UTC"),
|
|
value=reminder.message,
|
|
inline=False,
|
|
)
|
|
)
|
|
embed = build_embed(
|
|
title="Deleted Reminder(s)",
|
|
description="",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_author(
|
|
name=ctx.author.name + "#" + ctx.author.discriminator,
|
|
icon_url=ctx.author.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=ctx.author.avatar_url)
|
|
|
|
await context.edit_origin(
|
|
content=f"Deleted {len(context.selected_options)} reminder(s)",
|
|
components=components,
|
|
embed=embed,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
for row in components:
|
|
for component in row["components"]:
|
|
component["disabled"] = True
|
|
await message.edit(components=components)
|
|
|
|
@loop(minutes=1)
|
|
async def _expire_interaction(self):
|
|
keys = list(self.cache.keys())
|
|
for key in keys:
|
|
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
|
|
minutes=1
|
|
):
|
|
del self.cache[key]
|
|
|
|
@loop(seconds=15)
|
|
async def _remind(self):
|
|
reminders = Reminder.get_active(
|
|
remind_at={"$lt": datetime.utcnow() + timedelta(seconds=30)}
|
|
)
|
|
for reminder in reminders:
|
|
if reminder.remind_at <= datetime.utcnow():
|
|
user = await self.bot.fetch_user(reminder.user)
|
|
if not user:
|
|
reminder.delete()
|
|
continue
|
|
embed = build_embed(
|
|
title="You have a reminder",
|
|
description=reminder.message,
|
|
fields=[],
|
|
)
|
|
embed.set_author(
|
|
name=user.name + "#" + user.discriminator,
|
|
icon_url=user.avatar_url,
|
|
)
|
|
embed.set_thumbnail(url=user.avatar_url)
|
|
try:
|
|
await user.send(embed=embed)
|
|
except:
|
|
guild = self.bot.fetch_guild(reminder.guild)
|
|
channel = (
|
|
guild.get_channel(reminder.channel) if guild else None
|
|
)
|
|
if channel:
|
|
await channel.send(f"{user.mention}", embed=embed)
|
|
finally:
|
|
reminder.delete()
|
|
|
|
|
|
def setup(bot):
|
|
bot.add_cog(RemindmeCog(bot))
|