403 lines
15 KiB
Python
403 lines
15 KiB
Python
"""JARVIS Role Giver Cog."""
|
|
import asyncio
|
|
import logging
|
|
from typing import Dict
|
|
|
|
from interactions import (
|
|
AutocompleteContext,
|
|
Client,
|
|
Extension,
|
|
InteractionContext,
|
|
Permissions,
|
|
listen,
|
|
)
|
|
from interactions.client.utils.misc_utils import get
|
|
from interactions.models.discord.components import (
|
|
ActionRow,
|
|
Button,
|
|
StringSelectMenu,
|
|
StringSelectOption,
|
|
)
|
|
from interactions.models.discord.embed import EmbedField
|
|
from interactions.models.discord.enums import ButtonStyles
|
|
from interactions.models.discord.role import Role
|
|
from interactions.models.internal.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
slash_option,
|
|
)
|
|
from interactions.models.internal.command import check, cooldown
|
|
from interactions.models.internal.cooldowns import Buckets
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Rolegiver
|
|
from thefuzz import process
|
|
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
class RolegiverCog(Extension):
|
|
"""JARVIS Role Giver Cog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
self.cache: Dict[int, Dict[str, int]] = {}
|
|
|
|
@listen()
|
|
async def on_ready(self) -> None:
|
|
"""NAFF on_ready hook for loading cache."""
|
|
all_rolegivers = await Rolegiver.find({}).to_list(None)
|
|
for rolegiver in all_rolegivers:
|
|
guild = await self.bot.fetch_guild(rolegiver.guild)
|
|
if not guild:
|
|
await rolegiver.delete()
|
|
continue
|
|
role = await guild.fetch_role(rolegiver.role)
|
|
if not role:
|
|
await rolegiver.delete()
|
|
continue
|
|
if guild.id not in self.cache:
|
|
self.cache[guild.id] = {}
|
|
self.cache[guild.id][role.name] = role.id
|
|
|
|
rolegiver = SlashCommand(name="rolegiver", description="Allow users to choose their own roles")
|
|
|
|
@rolegiver.subcommand(
|
|
sub_cmd_name="add",
|
|
sub_cmd_description="Add a role to rolegiver",
|
|
)
|
|
@slash_option(name="role", description="Role to add", opt_type=OptionTypes.ROLE, required=True)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _rolegiver_add(self, ctx: InteractionContext, role: Role) -> None:
|
|
if role.id == ctx.guild.id:
|
|
await ctx.send("Cannot add `@everyone` to rolegiver", ephemeral=True)
|
|
return
|
|
|
|
if role.bot_managed or not role.is_assignable:
|
|
await ctx.send(
|
|
"Cannot assign this role, try lowering it below my role or using a different role",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
|
|
if setting and setting.roles and role.id in setting.roles:
|
|
await ctx.send("Role already in rolegiver", ephemeral=True)
|
|
return
|
|
|
|
if not setting:
|
|
setting = Rolegiver(guild=ctx.guild.id, roles=[])
|
|
|
|
setting.roles = setting.roles or []
|
|
|
|
setting.roles.append(role.id)
|
|
await setting.commit()
|
|
|
|
roles = []
|
|
for role_id in setting.roles:
|
|
if role_id == role.id:
|
|
continue
|
|
e_role = await ctx.guild.fetch_role(role_id)
|
|
if not e_role:
|
|
continue
|
|
roles.append(e_role)
|
|
if roles:
|
|
roles.sort(key=lambda x: -x.position)
|
|
|
|
value = "\n".join([r.mention for r in roles]) if roles else "None"
|
|
fields = [
|
|
EmbedField(name="New Role", value=f"{role.mention}"),
|
|
EmbedField(name="Existing Role(s)", value=value),
|
|
]
|
|
|
|
embed = build_embed(
|
|
title="Rolegiver Updated",
|
|
description="New role added to rolegiver",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
|
|
embed.set_footer(text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}")
|
|
components = Button(style=ButtonStyles.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}")
|
|
await ctx.send(embeds=embed, components=components)
|
|
|
|
if ctx.guild.id not in self.cache:
|
|
self.cache[ctx.guild.id] = {}
|
|
self.cache[ctx.guild.id][role.name] = role.id
|
|
|
|
@rolegiver.subcommand(sub_cmd_name="remove", sub_cmd_description="Remove a role from rolegiver")
|
|
@slash_option(
|
|
name="role",
|
|
description="Name of role to add",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
autocomplete=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _rolegiver_remove(self, ctx: InteractionContext, role: str) -> None:
|
|
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
|
|
if not setting or (setting and not setting.roles):
|
|
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
|
return
|
|
|
|
cache = self.cache.get(ctx.guild.id)
|
|
if cache:
|
|
role_id = cache.get(role)
|
|
else:
|
|
await ctx.send("Something went wrong, please try a different role", ephemeral=True)
|
|
return
|
|
|
|
setting.value.remove(role_id)
|
|
await setting.commit()
|
|
role = await ctx.guild.fetch_role(role_id)
|
|
if not role:
|
|
await ctx.send("Role not found in guild", ephemeral=True)
|
|
|
|
remaining = []
|
|
to_remove = []
|
|
for id_ in setting.value:
|
|
if role := await ctx.guild.fetch_role(id_):
|
|
remaining.append(role)
|
|
else:
|
|
to_remove.append(id_)
|
|
|
|
setting.value = [x for x in setting.value if x not in to_remove]
|
|
await setting.commit()
|
|
|
|
fields = [
|
|
EmbedField(name="Removed Role", value=role.mention),
|
|
EmbedField(name="Remaining Role(s)", value="\n".join([x.mention for x in remaining])),
|
|
]
|
|
|
|
embed = build_embed(title="Rolegiver Updated", description="Role removed from rolegiver", fields=fields)
|
|
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
|
|
embed.set_footer(text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}")
|
|
|
|
await ctx.send(
|
|
embeds=embed,
|
|
)
|
|
|
|
if ctx.guild.id in self.cache:
|
|
self.cache[ctx.guild.id].pop(role.name)
|
|
|
|
@rolegiver.subcommand(sub_cmd_name="list", sub_cmd_description="List rolegiver roles")
|
|
async def _rolegiver_list(self, ctx: InteractionContext) -> None:
|
|
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
|
|
if not setting or (setting and not setting.roles):
|
|
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
|
return
|
|
|
|
roles = []
|
|
for role_id in setting.roles:
|
|
e_role = await ctx.guild.fetch_role(role_id)
|
|
if not e_role:
|
|
continue
|
|
roles.append(e_role)
|
|
|
|
if roles:
|
|
roles.sort(key=lambda x: -x.position)
|
|
|
|
value = "\n".join([r.mention for r in roles]) if roles else "None"
|
|
|
|
embed = build_embed(
|
|
title="Rolegiver",
|
|
description=f"Available roles:\n{value}",
|
|
fields=[],
|
|
)
|
|
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
|
|
embed.set_footer(text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}")
|
|
components = Button(style=ButtonStyles.DANGER, emoji="🗑️", custom_id=f"delete|{ctx.author.id}")
|
|
await ctx.send(embeds=embed, components=components)
|
|
|
|
@rolegiver.subcommand(sub_cmd_name="get", sub_cmd_description="Get a role")
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=10)
|
|
async def _role_get(self, ctx: InteractionContext) -> None:
|
|
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
|
|
if not setting or (setting and not setting.roles):
|
|
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
|
return
|
|
|
|
options = []
|
|
for role in setting.roles:
|
|
role: Role = await ctx.guild.fetch_role(role)
|
|
option = StringSelectOption(label=role.name, value=str(role.id))
|
|
options.append(option)
|
|
|
|
select = StringSelectMenu(
|
|
options=options,
|
|
placeholder="Select roles to add",
|
|
min_values=1,
|
|
max_values=len(options),
|
|
)
|
|
components = [ActionRow(select)]
|
|
|
|
message = await ctx.send(content="\u200b", components=components)
|
|
|
|
try:
|
|
context = await self.bot.wait_for_component(
|
|
check=lambda x: ctx.author.id == x.ctx.author.id,
|
|
messages=message,
|
|
timeout=60 * 5,
|
|
)
|
|
|
|
added_roles = []
|
|
for role in context.ctx.values:
|
|
role = await ctx.guild.fetch_role(int(role))
|
|
added_roles.append(role)
|
|
await ctx.author.add_role(role, reason="Rolegiver")
|
|
|
|
roles = ctx.author.roles
|
|
if roles:
|
|
roles.sort(key=lambda x: -x.position)
|
|
_ = roles.pop(-1)
|
|
|
|
avalue = "\n".join([r.mention for r in added_roles]) if added_roles else "None"
|
|
value = "\n".join([r.mention for r in roles]) if roles else "None"
|
|
fields = [
|
|
EmbedField(name="Added Role(s)", value=avalue),
|
|
EmbedField(name="Prior Role(s)", value=value),
|
|
]
|
|
|
|
embed = build_embed(
|
|
title="User Given Role",
|
|
description=f"{len(added_roles)} role(s) given to {ctx.author.mention}",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
embed.set_author(
|
|
name=ctx.author.display_name,
|
|
icon_url=ctx.author.display_avatar.url,
|
|
)
|
|
|
|
embed.set_footer(text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}")
|
|
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
|
|
await context.ctx.edit_origin(embeds=embed, content="\u200b", components=components)
|
|
except asyncio.TimeoutError:
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
await message.edit(components=components)
|
|
|
|
@rolegiver.subcommand(sub_cmd_name="forfeit", sub_cmd_description="Forfeit a role")
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=10)
|
|
async def _role_remove(self, ctx: InteractionContext) -> None:
|
|
user_roles = ctx.author.roles
|
|
|
|
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
|
|
if not setting or (setting and not setting.roles):
|
|
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
|
return
|
|
elif not any(x.id in setting.roles for x in user_roles):
|
|
await ctx.send("You have no rolegiver roles", ephemeral=True)
|
|
return
|
|
|
|
valid = list(filter(lambda x: x.id in setting.roles, user_roles))
|
|
options = []
|
|
for role in valid:
|
|
option = StringSelectOption(label=role.name, value=str(role.id))
|
|
options.append(option)
|
|
|
|
select = StringSelectMenu(
|
|
options=options,
|
|
custom_id="to_remove",
|
|
placeholder="Select roles to remove",
|
|
min_values=1,
|
|
max_values=len(options),
|
|
)
|
|
components = [ActionRow(select)]
|
|
|
|
message = await ctx.send(content="\u200b", components=components)
|
|
|
|
try:
|
|
context = await self.bot.wait_for_component(
|
|
check=lambda x: ctx.author.id == x.ctx.author.id,
|
|
messages=message,
|
|
timeout=60 * 5,
|
|
)
|
|
|
|
removed_roles = []
|
|
for to_remove in context.ctx.values:
|
|
role = get(user_roles, id=int(to_remove))
|
|
await ctx.author.remove_role(role, reason="Rolegiver")
|
|
user_roles.remove(role)
|
|
removed_roles.append(role)
|
|
|
|
user_roles.sort(key=lambda x: -x.position)
|
|
_ = user_roles.pop(-1)
|
|
|
|
value = "\n".join([r.mention for r in user_roles]) if user_roles else "None"
|
|
rvalue = "\n".join([r.mention for r in removed_roles]) if removed_roles else "None"
|
|
fields = [
|
|
EmbedField(name="Removed Role(s)", value=rvalue),
|
|
EmbedField(name="Remaining Role(s)", value=value),
|
|
]
|
|
|
|
embed = build_embed(
|
|
title="User Forfeited Role",
|
|
description=f"{len(removed_roles)} role(s) removed from {ctx.author.mention}",
|
|
fields=fields,
|
|
)
|
|
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url)
|
|
|
|
embed.set_footer(text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}")
|
|
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
|
|
await context.ctx.edit_origin(embeds=embed, components=components, content="\u200b")
|
|
|
|
except asyncio.TimeoutError:
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
await message.edit(components=components)
|
|
|
|
@rolegiver.subcommand(sub_cmd_name="cleanup", sub_cmd_description="Removed deleted roles from rolegiver")
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _rolegiver_cleanup(self, ctx: InteractionContext) -> None:
|
|
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
|
|
if not setting or not setting.roles:
|
|
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
|
guild_role_ids = [r.id for r in ctx.guild.roles]
|
|
for role_id in setting.roles:
|
|
if role_id not in guild_role_ids:
|
|
setting.roles.remove(role_id)
|
|
await setting.commit()
|
|
|
|
await ctx.send("Rolegiver cleanup finished")
|
|
self.cache.pop(ctx.guild.id, None)
|
|
|
|
@_rolegiver_remove.autocomplete("role")
|
|
async def _autocomplete(self, ctx: AutocompleteContext, role: str) -> None:
|
|
if not self.cache.get(ctx.guild.id):
|
|
rolegivers = await Rolegiver.find(q(guild=ctx.guild.id)).to_list(None)
|
|
for rolegiver in rolegivers:
|
|
role = await ctx.guild.fetch_role(rolegiver.role)
|
|
if not role:
|
|
await rolegiver.delete()
|
|
continue
|
|
if ctx.guild.id not in self.cache:
|
|
self.cache[ctx.guild.id] = {}
|
|
self.cache[ctx.guild.id][role.name] = role.id
|
|
results = process.extract(role, self.cache.get(ctx.guild.id).keys(), limit=25)
|
|
choices = [{"name": r[0], "value": r[0]} for r in results]
|
|
await ctx.send(choices=choices)
|
|
|
|
|
|
def setup(bot: Client) -> None:
|
|
"""Add RolegiverCog to JARVIS"""
|
|
RolegiverCog(bot)
|