211 lines
7.3 KiB
Python
211 lines
7.3 KiB
Python
"""JARVIS Autoreact Cog."""
|
|
import logging
|
|
import re
|
|
from typing import Optional, Tuple
|
|
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Autoreact
|
|
from naff import Client, Extension, InteractionContext, Permissions
|
|
from naff.client.utils.misc_utils import find
|
|
from naff.models.discord.channel import GuildText
|
|
from naff.models.naff.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
slash_option,
|
|
)
|
|
from naff.models.naff.command import check
|
|
|
|
from jarvis.data.unicode import emoji_list
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
class AutoReactCog(Extension):
|
|
"""JARVIS Autoreact Cog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
self.custom_emote = re.compile(r"^<:\w+:(\d+)>$")
|
|
|
|
async def create_autoreact(
|
|
self, ctx: InteractionContext, channel: GuildText, thread: bool
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Create an autoreact monitor on a channel.
|
|
|
|
Args:
|
|
ctx: Interaction context of command
|
|
channel: Channel to monitor
|
|
thread: Create a thread
|
|
|
|
Returns:
|
|
Tuple of success? and error message
|
|
"""
|
|
exists = await Autoreact.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if exists:
|
|
return False, f"Autoreact already exists for {channel.mention}."
|
|
|
|
await Autoreact(
|
|
guild=ctx.guild.id,
|
|
channel=channel.id,
|
|
reactions=[],
|
|
thread=thread,
|
|
admin=ctx.author.id,
|
|
).commit()
|
|
|
|
return True, None
|
|
|
|
async def delete_autoreact(self, ctx: InteractionContext, channel: GuildText) -> bool:
|
|
"""
|
|
Remove an autoreact monitor on a channel.
|
|
|
|
Args:
|
|
ctx: Interaction context of command
|
|
channel: Channel to stop monitoring
|
|
|
|
Returns:
|
|
Success?
|
|
"""
|
|
ar = await Autoreact.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if ar:
|
|
await ar.delete()
|
|
return True
|
|
return False
|
|
|
|
autoreact = SlashCommand(name="autoreact", description="Channel message autoreacts")
|
|
|
|
@autoreact.subcommand(
|
|
sub_cmd_name="add",
|
|
sub_cmd_description="Add an autoreact emote to a channel",
|
|
)
|
|
@slash_option(
|
|
name="channel",
|
|
description="Autoreact channel to add emote to",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@slash_option(
|
|
name="thread", description="Create a thread?", opt_type=OptionTypes.BOOLEAN, required=False
|
|
)
|
|
@slash_option(
|
|
name="emote", description="Emote to add", opt_type=OptionTypes.STRING, required=False
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _autoreact_add(
|
|
self, ctx: InteractionContext, channel: GuildText, thread: bool = True, emote: str = None
|
|
) -> None:
|
|
await ctx.defer()
|
|
if emote:
|
|
custom_emoji = self.custom_emote.match(emote)
|
|
standard_emoji = emote in emoji_list
|
|
if not custom_emoji and not standard_emoji:
|
|
await ctx.send(
|
|
"Please use either an emote from this server or a unicode emoji.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if custom_emoji:
|
|
emoji_id = int(custom_emoji.group(1))
|
|
if not find(lambda x: x.id == emoji_id, await ctx.guild.fetch_all_custom_emojis()):
|
|
await ctx.send("Please use a custom emote from this server.", ephemeral=True)
|
|
return
|
|
autoreact = await Autoreact.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if not autoreact:
|
|
await self.create_autoreact(ctx, channel, thread)
|
|
autoreact = await Autoreact.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if emote and emote in autoreact.reactions:
|
|
await ctx.send(
|
|
f"Emote already added to {channel.mention} autoreactions.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if emote and len(autoreact.reactions) >= 5:
|
|
await ctx.send(
|
|
"Max number of reactions hit. Remove a different one to add this one",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if emote:
|
|
autoreact.reactions.append(emote)
|
|
autoreact.thread = thread
|
|
await autoreact.commit()
|
|
message = ""
|
|
if emote:
|
|
message += f" Added {emote} to {channel.mention} autoreact."
|
|
message += f" Set autoreact thread creation to {thread} in {channel.mention}"
|
|
await ctx.send(message)
|
|
|
|
@autoreact.subcommand(
|
|
sub_cmd_name="remove",
|
|
sub_cmd_description="Remove an autoreact emote to a channel",
|
|
)
|
|
@slash_option(
|
|
name="channel",
|
|
description="Autoreact channel to remove emote from",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@slash_option(
|
|
name="emote",
|
|
description="Emote to remove (use all to delete)",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _autoreact_remove(
|
|
self, ctx: InteractionContext, channel: GuildText, emote: str
|
|
) -> None:
|
|
autoreact = await Autoreact.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if not autoreact:
|
|
await ctx.send(
|
|
f"Please create autoreact first with /autoreact add {channel.mention} {emote}",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if emote.lower() == "all":
|
|
await self.delete_autoreact(ctx, channel)
|
|
await ctx.send(f"Autoreact removed from {channel.mention}")
|
|
elif emote not in autoreact.reactions:
|
|
await ctx.send(
|
|
f"{emote} not used in {channel.mention} autoreactions.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
else:
|
|
autoreact.reactions.remove(emote)
|
|
await autoreact.commit()
|
|
if len(autoreact.reactions) == 0 and not autoreact.thread:
|
|
await self.delete_autoreact(ctx, channel)
|
|
await ctx.send(f"Removed {emote} from {channel.mention} autoreact.")
|
|
|
|
@autoreact.subcommand(
|
|
sub_cmd_name="list",
|
|
sub_cmd_description="List all autoreacts on a channel",
|
|
)
|
|
@slash_option(
|
|
name="channel",
|
|
description="Autoreact channel to list",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
async def _autoreact_list(self, ctx: InteractionContext, channel: GuildText) -> None:
|
|
exists = await Autoreact.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if not exists:
|
|
await ctx.send(
|
|
f"Please create autoreact first with /autoreact add {channel.mention} <emote>",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
message = ""
|
|
if len(exists.reactions) > 0:
|
|
message = f"Current active autoreacts on {channel.mention}:\n" + "\n".join(
|
|
exists.reactions
|
|
)
|
|
else:
|
|
message = f"No reactions set on {channel.mention}"
|
|
await ctx.send(message)
|
|
|
|
|
|
def setup(bot: Client) -> None:
|
|
"""Add AutoReactCog to JARVIS"""
|
|
AutoReactCog(bot)
|