Rolegiver groups
This commit is contained in:
parent
a6f99b2041
commit
b251486dff
2 changed files with 1353 additions and 1164 deletions
|
@ -1,7 +1,6 @@
|
|||
"""JARVIS Role Giver Cog."""
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Dict
|
||||
|
||||
from interactions import (
|
||||
AutocompleteContext,
|
||||
|
@ -41,26 +40,31 @@ class RolegiverCog(Extension):
|
|||
def __init__(self, bot: Client):
|
||||
self.bot = bot
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.cache: Dict[int, Dict[str, int]] = {}
|
||||
self._group_cache: dict[int, dict[str, dict[str, int]]] = {}
|
||||
"""`{GUILD_ID: {GROUP_NAME: {ROLE_NAME: ROLE_ID}}}`"""
|
||||
|
||||
@listen()
|
||||
async def on_ready(self) -> None:
|
||||
"""NAFF on_ready hook for loading cache."""
|
||||
async for rolegiver in Rolegiver.find():
|
||||
if not rolegiver.group:
|
||||
rolegiver.group = "Default"
|
||||
guild = await self.bot.fetch_guild(rolegiver.guild)
|
||||
if not guild:
|
||||
await rolegiver.delete()
|
||||
continue
|
||||
if guild.id not in self._group_cache:
|
||||
self._group_cache[guild.id] = {rolegiver.group: {}}
|
||||
if rolegiver.group not in self._group_cache[guild.id]:
|
||||
self._group_cache[guild.id][rolegiver.group] = {}
|
||||
roles = []
|
||||
for role in rolegiver.roles:
|
||||
role = await guild.fetch_role(role)
|
||||
for role_id in rolegiver.roles:
|
||||
role = await guild.fetch_role(role_id)
|
||||
if role:
|
||||
roles.append(role.id)
|
||||
else:
|
||||
continue
|
||||
if guild.id not in self.cache:
|
||||
self.cache[guild.id] = {}
|
||||
self.cache[guild.id][role.name] = role.id
|
||||
self._group_cache[guild.id][rolegiver.group][role.name] = role.id
|
||||
rolegiver.roles = roles
|
||||
await rolegiver.save()
|
||||
|
||||
|
@ -68,6 +72,50 @@ class RolegiverCog(Extension):
|
|||
name="rolegiver", description="Allow users to choose their own roles"
|
||||
)
|
||||
|
||||
rg_group = rolegiver.group(name="group", description="Manage rolegiver groups")
|
||||
|
||||
@rg_group.subcommand(
|
||||
sub_cmd_name="create", sub_cmd_description="Create a rolegiver group"
|
||||
)
|
||||
@slash_option(
|
||||
name="group",
|
||||
description="Group to create",
|
||||
opt_type=OptionType.STRING,
|
||||
required=True,
|
||||
)
|
||||
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
||||
async def _group_create(self, ctx: InteractionContext, group: str):
|
||||
if await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
):
|
||||
await ctx.send("Group already exists!", ephemeral=True)
|
||||
return
|
||||
|
||||
rolegiver = Rolegiver(guild=ctx.guild.id, group=group, roles=[])
|
||||
await rolegiver.save()
|
||||
|
||||
await ctx.send(f"Rolegiver group {group} created!")
|
||||
|
||||
@rg_group.subcommand(
|
||||
sub_cmd_name="delete", sub_cmd_description="DDelete a rolegiver group"
|
||||
)
|
||||
@slash_option(
|
||||
name="group",
|
||||
description="Group to delete",
|
||||
opt_type=OptionType.STRING,
|
||||
required=True,
|
||||
autocomplete=True,
|
||||
)
|
||||
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
||||
async def _group_delete(self, ctx: InteractionContext, group: str):
|
||||
if rolegiver := await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
):
|
||||
await rolegiver.delete()
|
||||
await ctx.send(f"Rolegiver group {group} deleted!")
|
||||
else:
|
||||
await ctx.send(f"Rolegiver group {group} does not exist!", ephemeral=True)
|
||||
|
||||
@rolegiver.subcommand(
|
||||
sub_cmd_name="add",
|
||||
sub_cmd_description="Add a role to rolegiver",
|
||||
|
@ -75,8 +123,17 @@ class RolegiverCog(Extension):
|
|||
@slash_option(
|
||||
name="role", description="Role to add", opt_type=OptionType.ROLE, required=True
|
||||
)
|
||||
@slash_option(
|
||||
name="group",
|
||||
description="Group to add to",
|
||||
opt_type=OptionType.STRING,
|
||||
required=False,
|
||||
autocomplete=True,
|
||||
)
|
||||
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
||||
async def _rolegiver_add(self, ctx: InteractionContext, role: Role) -> None:
|
||||
async def _rolegiver_add(
|
||||
self, ctx: InteractionContext, role: Role, group: str = "Default"
|
||||
) -> None:
|
||||
if role.id == ctx.guild.id:
|
||||
await ctx.send("Cannot add `@everyone` to rolegiver", ephemeral=True)
|
||||
return
|
||||
|
@ -88,21 +145,30 @@ class RolegiverCog(Extension):
|
|||
)
|
||||
return
|
||||
|
||||
setting = await Rolegiver.find_one(Rolegiver.guild == ctx.guild.id)
|
||||
if setting and setting.roles and role.id in setting.roles:
|
||||
rolegiver = await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
)
|
||||
if rolegiver and rolegiver.roles and role.id in rolegiver.roles:
|
||||
await ctx.send("Role already in rolegiver", ephemeral=True)
|
||||
return
|
||||
|
||||
if not setting:
|
||||
setting = Rolegiver(guild=ctx.guild.id, roles=[])
|
||||
elif rolegiver and len(rolegiver.roles) >= 15:
|
||||
await ctx.send(
|
||||
f"Maximum roles in group {group}. Please make a new group",
|
||||
ephemeral=True,
|
||||
)
|
||||
return
|
||||
|
||||
setting.roles = setting.roles or []
|
||||
if not rolegiver:
|
||||
rolegiver = Rolegiver(guild=ctx.guild.id, roles=[], group=group)
|
||||
|
||||
setting.roles.append(role.id)
|
||||
await setting.save()
|
||||
rolegiver.roles = rolegiver.roles or []
|
||||
|
||||
rolegiver.roles.append(role.id)
|
||||
await rolegiver.save()
|
||||
|
||||
roles = []
|
||||
for role_id in setting.roles:
|
||||
for role_id in rolegiver.roles:
|
||||
if role_id == role.id:
|
||||
continue
|
||||
e_role = await ctx.guild.fetch_role(role_id)
|
||||
|
@ -120,7 +186,7 @@ class RolegiverCog(Extension):
|
|||
|
||||
embed = build_embed(
|
||||
title="Rolegiver Updated",
|
||||
description="New role added to rolegiver",
|
||||
description=f"New role added to rolegiver group {group}",
|
||||
fields=fields,
|
||||
)
|
||||
|
||||
|
@ -132,83 +198,116 @@ class RolegiverCog(Extension):
|
|||
)
|
||||
await ctx.send(embeds=embed, components=components)
|
||||
|
||||
if ctx.guild.id not in self.cache:
|
||||
self.cache[ctx.guild.id] = {}
|
||||
self.cache[ctx.guild.id][role.name] = role.id
|
||||
if ctx.guild.id not in self._group_cache:
|
||||
self._group_cache[ctx.guild.id] = {group: {}}
|
||||
self._group_cache[ctx.guild.id][group].update({role.name: role.id})
|
||||
|
||||
@rolegiver.subcommand(
|
||||
sub_cmd_name="remove", sub_cmd_description="Remove a role from rolegiver"
|
||||
)
|
||||
@slash_option(
|
||||
name="role",
|
||||
description="Name of role to add",
|
||||
name="group",
|
||||
description="Name of group to remove from",
|
||||
opt_type=OptionType.STRING,
|
||||
required=True,
|
||||
autocomplete=True,
|
||||
)
|
||||
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
||||
async def _rolegiver_remove(self, ctx: InteractionContext, role: str) -> None:
|
||||
setting = await Rolegiver.find_one(Rolegiver.guild == ctx.guild.id)
|
||||
if not setting or (setting and not setting.roles):
|
||||
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
||||
async def _rolegiver_remove(self, ctx: InteractionContext, group: str) -> None:
|
||||
rolegiver = await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
)
|
||||
if not rolegiver or (rolegiver and not rolegiver.roles):
|
||||
await ctx.send(f"Rolegiver {group} has no roles", ephemeral=True)
|
||||
return
|
||||
|
||||
cache = self.cache.get(ctx.guild.id)
|
||||
if cache:
|
||||
role_id = cache.get(role)
|
||||
else:
|
||||
await ctx.send(
|
||||
"Something went wrong, please try a different role", ephemeral=True
|
||||
roles: dict[int, Role] = {}
|
||||
|
||||
for role_id in rolegiver.roles:
|
||||
role = await ctx.guild.fetch_role(role_id)
|
||||
if role:
|
||||
roles[int(role.id)] = role
|
||||
|
||||
rolegiver.roles = [r for r in roles.keys()]
|
||||
|
||||
options = [
|
||||
StringSelectOption(label=v.name, value=str(k)) for k, v in roles.items()
|
||||
][:25]
|
||||
select = StringSelectMenu(
|
||||
*options,
|
||||
placeholder="Select roles to remove",
|
||||
min_values=1,
|
||||
max_values=len(options),
|
||||
)
|
||||
components = [ActionRow(select)]
|
||||
|
||||
message = await ctx.send(
|
||||
content=f"Removing roles from {group}", components=components
|
||||
)
|
||||
|
||||
try:
|
||||
resp = await self.bot.wait_for_component(
|
||||
check=lambda x: ctx.author.id == x.ctx.author.id,
|
||||
messages=message,
|
||||
timeout=60 * 5,
|
||||
)
|
||||
return
|
||||
|
||||
setting.value.remove(role_id)
|
||||
await setting.save()
|
||||
role = await ctx.guild.fetch_role(role_id)
|
||||
if not role:
|
||||
await ctx.send("Role not found in guild", ephemeral=True)
|
||||
except asyncio.TimeoutError:
|
||||
for row in components:
|
||||
for component in row.components:
|
||||
component.disabled = True
|
||||
await message.edit(components=components)
|
||||
|
||||
remaining = []
|
||||
to_remove = []
|
||||
for id_ in setting.value:
|
||||
if role := await ctx.guild.fetch_role(id_):
|
||||
remaining.append(role)
|
||||
else:
|
||||
to_remove.append(id_)
|
||||
removed_roles = []
|
||||
for role_id in resp.ctx.values:
|
||||
role = roles.get(int(role_id))
|
||||
removed_roles.append(role)
|
||||
rolegiver.roles.remove(int(role_id))
|
||||
|
||||
setting.value = [x for x in setting.value if x not in to_remove]
|
||||
await setting.save()
|
||||
await rolegiver.save()
|
||||
|
||||
fields = [
|
||||
EmbedField(name="Removed Role", value=role.mention),
|
||||
EmbedField(
|
||||
name="Removed Role(s)",
|
||||
value="\n".join(x.mention for x in removed_roles),
|
||||
),
|
||||
EmbedField(
|
||||
name="Remaining Role(s)",
|
||||
value="\n".join([x.mention for x in remaining]),
|
||||
value="\n".join(
|
||||
x.mention for x in roles.values() if x not in removed_roles
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
embed = build_embed(
|
||||
title="Rolegiver Updated",
|
||||
description="Role removed from rolegiver",
|
||||
description="Role(s) removed from rolegiver",
|
||||
fields=fields,
|
||||
)
|
||||
|
||||
embed.set_thumbnail(url=ctx.guild.icon.url)
|
||||
|
||||
embed.set_footer(text=f"{ctx.author.username} | {ctx.author.id}")
|
||||
|
||||
await ctx.send(
|
||||
embeds=embed,
|
||||
)
|
||||
|
||||
if ctx.guild.id in self.cache:
|
||||
self.cache[ctx.guild.id].pop(role.name)
|
||||
self._group_cache[ctx.guild.id].update(
|
||||
{group: {v.name: k for k, v in roles.items() if v not in removed_roles}}
|
||||
)
|
||||
|
||||
@rolegiver.subcommand(
|
||||
sub_cmd_name="list", sub_cmd_description="List rolegiver roles"
|
||||
)
|
||||
async def _rolegiver_list(self, ctx: InteractionContext) -> None:
|
||||
setting = await Rolegiver.find_one(Rolegiver.guild == ctx.guild.id)
|
||||
@slash_option(
|
||||
name="group",
|
||||
description="Name of group to list",
|
||||
opt_type=OptionType.STRING,
|
||||
required=False,
|
||||
autocomplete=True,
|
||||
)
|
||||
async def _rolegiver_list(self, ctx: InteractionContext, group: str = None) -> None:
|
||||
setting = await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
)
|
||||
if not setting or (setting and not setting.roles):
|
||||
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
||||
return
|
||||
|
@ -239,43 +338,64 @@ class RolegiverCog(Extension):
|
|||
)
|
||||
await ctx.send(embeds=embed, components=components)
|
||||
|
||||
@rolegiver.subcommand(sub_cmd_name="get", sub_cmd_description="Get a role")
|
||||
role = SlashCommand(name="role", description="Get roles!")
|
||||
|
||||
@role.subcommand(sub_cmd_name="get", sub_cmd_description="Get a role")
|
||||
@slash_option(
|
||||
name="group",
|
||||
description="Name of group to list",
|
||||
opt_type=OptionType.STRING,
|
||||
required=False,
|
||||
autocomplete=True,
|
||||
)
|
||||
@cooldown(bucket=Buckets.USER, rate=1, interval=10)
|
||||
async def _role_get(self, ctx: InteractionContext) -> None:
|
||||
try:
|
||||
setting = await Rolegiver.find_one(Rolegiver.guild == ctx.guild.id)
|
||||
if not setting or (setting and not setting.roles):
|
||||
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
||||
return
|
||||
|
||||
options = []
|
||||
for role in setting.roles:
|
||||
role: Role = await ctx.guild.fetch_role(role)
|
||||
option = StringSelectOption(label=role.name, value=str(role.id))
|
||||
options.append(option)
|
||||
|
||||
select = StringSelectMenu(
|
||||
*options,
|
||||
placeholder="Select roles to add",
|
||||
min_values=1,
|
||||
max_values=len(options),
|
||||
)
|
||||
components = [ActionRow(select)]
|
||||
|
||||
message = await ctx.send(content="\u200b", components=components)
|
||||
except Exception as e:
|
||||
self.logger.error("Encountered error", exc_info=True)
|
||||
async def _role_get(self, ctx: InteractionContext, group: str = "Default") -> None:
|
||||
rolegiver = await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
)
|
||||
if not rolegiver or (rolegiver and not rolegiver.roles):
|
||||
await ctx.send(f"Rolegiver group {group} has no roles", ephemeral=True)
|
||||
return
|
||||
|
||||
options = []
|
||||
roles = []
|
||||
for role_id in rolegiver.roles:
|
||||
role: Role = await ctx.guild.fetch_role(role_id)
|
||||
if not role:
|
||||
rolegiver.roles.remove(role_id)
|
||||
continue
|
||||
roles.append(role)
|
||||
if ctx.author.has_role(role):
|
||||
continue
|
||||
option = StringSelectOption(label=role.name, value=str(role.id))
|
||||
options.append(option)
|
||||
|
||||
await rolegiver.save()
|
||||
self._group_cache[ctx.guild.id].update({group: {r.name: r.id for r in roles}})
|
||||
|
||||
if len(options) == 0:
|
||||
await ctx.send("You have every role from this group!", ephemeral=True)
|
||||
return
|
||||
|
||||
select = StringSelectMenu(
|
||||
*options,
|
||||
placeholder="Select roles to add",
|
||||
min_values=1,
|
||||
max_values=len(options),
|
||||
)
|
||||
components = [ActionRow(select)]
|
||||
|
||||
message = await ctx.send(content="\u200b", components=components)
|
||||
|
||||
try:
|
||||
context = await self.bot.wait_for_component(
|
||||
response = await self.bot.wait_for_component(
|
||||
check=lambda x: ctx.author.id == x.ctx.author.id,
|
||||
messages=message,
|
||||
timeout=60 * 5,
|
||||
)
|
||||
|
||||
added_roles = []
|
||||
for role in context.ctx.values:
|
||||
for role in response.ctx.values:
|
||||
role = await ctx.guild.fetch_role(int(role))
|
||||
added_roles.append(role)
|
||||
await ctx.author.add_role(role, reason="Rolegiver")
|
||||
|
@ -305,7 +425,7 @@ class RolegiverCog(Extension):
|
|||
for component in row.components:
|
||||
component.disabled = True
|
||||
|
||||
await context.ctx.edit_origin(
|
||||
await response.ctx.edit_origin(
|
||||
embeds=embed, content="\u200b", components=components
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
|
@ -314,20 +434,33 @@ class RolegiverCog(Extension):
|
|||
component.disabled = True
|
||||
await message.edit(components=components)
|
||||
|
||||
@rolegiver.subcommand(sub_cmd_name="forfeit", sub_cmd_description="Forfeit a role")
|
||||
@role.subcommand(sub_cmd_name="forfeit", sub_cmd_description="Forfeit a role")
|
||||
@slash_option(
|
||||
name="group",
|
||||
description="Name of group to give up from",
|
||||
opt_type=OptionType.STRING,
|
||||
required=False,
|
||||
autocomplete=True,
|
||||
)
|
||||
@cooldown(bucket=Buckets.USER, rate=1, interval=10)
|
||||
async def _role_remove(self, ctx: InteractionContext) -> None:
|
||||
async def _role_remove(
|
||||
self, ctx: InteractionContext, group: str = "Default"
|
||||
) -> None:
|
||||
user_roles = ctx.author.roles
|
||||
|
||||
setting = await Rolegiver.find_one(Rolegiver.guild == ctx.guild.id)
|
||||
if not setting or (setting and not setting.roles):
|
||||
rolegiver = await Rolegiver.find_one(
|
||||
Rolegiver.guild == ctx.guild.id, Rolegiver.group == group
|
||||
)
|
||||
if not rolegiver or (rolegiver and not rolegiver.roles):
|
||||
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
||||
return
|
||||
elif not any(x.id in setting.roles for x in user_roles):
|
||||
await ctx.send("You have no rolegiver roles", ephemeral=True)
|
||||
elif not any(x.id in rolegiver.roles for x in user_roles):
|
||||
await ctx.send(
|
||||
f"You have no rolegiver roles from group {group}", ephemeral=True
|
||||
)
|
||||
return
|
||||
|
||||
valid = list(filter(lambda x: x.id in setting.roles, user_roles))
|
||||
valid = list(filter(lambda x: x.id in rolegiver.roles, user_roles))
|
||||
options = []
|
||||
for role in valid:
|
||||
option = StringSelectOption(label=role.name, value=str(role.id))
|
||||
|
@ -396,36 +529,38 @@ class RolegiverCog(Extension):
|
|||
|
||||
@rolegiver.subcommand(
|
||||
sub_cmd_name="cleanup",
|
||||
sub_cmd_description="Removed deleted roles from rolegiver",
|
||||
sub_cmd_description="Removed deleted roles from rolegivers",
|
||||
)
|
||||
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
||||
async def _rolegiver_cleanup(self, ctx: InteractionContext) -> None:
|
||||
setting = await Rolegiver.find_one(Rolegiver.guild == ctx.guild.id)
|
||||
if not setting or not setting.roles:
|
||||
await ctx.send("Rolegiver has no roles", ephemeral=True)
|
||||
guild_role_ids = [r.id for r in ctx.guild.roles]
|
||||
for role_id in setting.roles:
|
||||
if role_id not in guild_role_ids:
|
||||
setting.roles.remove(role_id)
|
||||
await setting.save()
|
||||
|
||||
await ctx.defer()
|
||||
async for rolegiver in Rolegiver.find(Rolegiver.guild == ctx.guild.id):
|
||||
if not rolegiver.roles:
|
||||
del self._group_cache[ctx.guild.id][rolegiver.group]
|
||||
await rolegiver.delete()
|
||||
continue
|
||||
guild_role_ids = [r.id for r in ctx.guild.roles]
|
||||
for role_id in rolegiver.roles:
|
||||
if role_id not in guild_role_ids:
|
||||
rolegiver.roles.remove(role_id)
|
||||
await rolegiver.save()
|
||||
await ctx.send("Rolegiver cleanup finished")
|
||||
self.cache.pop(ctx.guild.id, None)
|
||||
|
||||
@_rolegiver_remove.autocomplete("role")
|
||||
async def _autocomplete(self, ctx: AutocompleteContext, role: str) -> None:
|
||||
if not self.cache.get(ctx.guild.id):
|
||||
async for rolegiver in Rolegiver.find(Rolegiver.guild == ctx.guild.id):
|
||||
role = await ctx.guild.fetch_role(rolegiver.role)
|
||||
if not role:
|
||||
await rolegiver.delete()
|
||||
continue
|
||||
if ctx.guild.id not in self.cache:
|
||||
self.cache[ctx.guild.id] = {}
|
||||
self.cache[ctx.guild.id][role.name] = role.id
|
||||
results = process.extract(role, self.cache.get(ctx.guild.id).keys(), limit=25)
|
||||
@_group_delete.autocomplete("group")
|
||||
@_role_get.autocomplete("group")
|
||||
@_role_remove.autocomplete("group")
|
||||
@_rolegiver_add.autocomplete("group")
|
||||
@_rolegiver_remove.autocomplete("group")
|
||||
@_rolegiver_list.autocomplete("group")
|
||||
async def _autocomplete_group(self, ctx: AutocompleteContext):
|
||||
groups = list(self._group_cache.get(ctx.guild.id).keys())
|
||||
if not groups:
|
||||
rolegivers = await Rolegiver.find(Rolegiver.guild == ctx.guild.id).to_list()
|
||||
groups = [r.group for r in rolegivers]
|
||||
|
||||
results = process.extract(ctx.input_text, groups, limit=5)
|
||||
choices = [{"name": r[0], "value": r[0]} for r in results]
|
||||
await ctx.send(choices=choices)
|
||||
await ctx.send(choices)
|
||||
|
||||
|
||||
def setup(bot: Client) -> None:
|
||||
|
|
2150
poetry.lock
generated
2150
poetry.lock
generated
File diff suppressed because it is too large
Load diff
Loading…
Add table
Reference in a new issue