From e48c987bcd7f1ec53b4ad7688ecf7d823e35a03f Mon Sep 17 00:00:00 2001 From: Zevaryx Date: Mon, 26 Jul 2021 19:10:49 -0600 Subject: [PATCH] Add roleping expansion, closes #38 --- jarvis/__init__.py | 9 + jarvis/cogs/admin/roleping.py | 379 +++++++++++++++++++++++++++++----- jarvis/db/types.py | 22 ++ 3 files changed, 364 insertions(+), 46 deletions(-) diff --git a/jarvis/__init__.py b/jarvis/__init__.py index 7feedcc..4c91d02 100644 --- a/jarvis/__init__.py +++ b/jarvis/__init__.py @@ -10,6 +10,7 @@ from psutil import Process from jarvis import logo, tasks, utils from jarvis.config import get_config from jarvis.db import DBManager +from jarvis.events import guild, member, message if asyncio.get_event_loop().is_closed(): asyncio.set_event_loop(asyncio.new_event_loop()) @@ -70,6 +71,14 @@ def run(ctx=None): jarvis.max_messages = config.max_messages tasks.init() + + # Add event listeners + listeners = [ + guild.GuildEventHandler(jarvis), + member.MemberEventHandler(jarvis), + message.MessageEventHandler(jarvis), + ] + jarvis.run(config.token, bot=True, reconnect=True) for cog in jarvis.cogs: session = getattr(cog, "_session", None) diff --git a/jarvis/cogs/admin/roleping.py b/jarvis/cogs/admin/roleping.py index 49786a6..f17b54a 100644 --- a/jarvis/cogs/admin/roleping.py +++ b/jarvis/cogs/admin/roleping.py @@ -1,71 +1,79 @@ -from discord import Role +from datetime import datetime, timedelta + +from ButtonPaginator import Paginator +from discord import Member, Role from discord.ext import commands from discord_slash import SlashContext, cog_ext +from discord_slash.model import ButtonStyle from discord_slash.utils.manage_commands import create_option -from jarvis.db.types import Setting +from jarvis.db.types import Roleping +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions -class RolepingCog(commands.Cog): +class RolepingCog(CacheCog): def __init__(self, bot): - self.bot = bot + super().__init__(bot) @cog_ext.cog_subcommand( base="roleping", - name="block", - description="Add a role to the roleping blocklist", + name="add", + description="Add a role to roleping", options=[ create_option( name="role", - description="Role to add to blocklist", + description="Role to add to roleping", option_type=8, required=True, ) ], ) - @commands.has_permissions(administrator=True) - async def _roleping_block(self, ctx: SlashContext, role: Role): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - roles = Setting(guild=ctx.guild.id, setting="roleping", value=[]) + @admin_or_permissions(manage_server=True) + async def _roleping_add(self, ctx: SlashContext, role: Role): + roleping = Roleping.get(guild=ctx.guild.id, role=role.id) + if not roleping: + roleping = Roleping( + role=role.id, + guild=ctx.guild.id, + admin=ctx.author.id, + active=True, + bypass={"roles": [], "users": []}, + ) - if role.id in roles.value: + else: await ctx.send( - f"Role `{role.name}` already in blocklist.", hidden=True + f"Role `{role.name}` already in roleping.", hidden=True ) return - roles.value.append(role.id) - roles.update() - await ctx.send(f"Role `{role.name}` added to blocklist.") + + roleping.insert() + await ctx.send(f"Role `{role.name}` added to roleping.") @cog_ext.cog_subcommand( base="roleping", - name="allow", - description="Remove a role from the roleping blocklist", + name="remove", + description="Remove a role from the roleping", options=[ create_option( name="role", - description="Role to remove from blocklist", + description="Role to remove from roleping", option_type=8, required=True, ) ], ) - @commands.has_permissions(administrator=True) - async def _roleping_allow(self, ctx: SlashContext, role: Role): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - await ctx.send("No blocklist configured.", hidden=True) + @admin_or_permissions(manage_server=True) + async def _roleping_remove(self, ctx: SlashContext, role: Role): + roleping = Roleping.get(guild=ctx.guild.id, role=role.id) + if not roleping: + await ctx.send("Roleping does not exist", hidden=True) return - if role.id not in roles.value: - await ctx.send( - f"Role `{role.name}` not in blocklist.", hidden=True - ) - return - roles.value.remove(role.id) - roles.update() - await ctx.send(f"Role `{role.name}` removed blocklist.") + roleping.delete() + await ctx.send(f"Role `{role.name}` removed from roleping.") @cog_ext.cog_subcommand( base="roleping", @@ -73,19 +81,298 @@ class RolepingCog(commands.Cog): description="List all blocklisted roles", ) async def _roleping_list(self, ctx: SlashContext): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - await ctx.send("No blocklist configured.", hidden=True) + exists = self.check_cache(ctx) + if exists: + await ctx.defer(hidden=True) + await ctx.send( + "Please use existing interaction: " + + f"{exists['paginator']._message.jump_url}", + hidden=True, + ) return - message = "Blocklisted Roles:\n```\n" - if not roles.value: - await ctx.send("No roles blocklisted.", hidden=True) + rolepings = Roleping.get_many(guild=ctx.guild.id) + if not rolepings: + await ctx.send("No rolepings configured", hidden=True) return - for role in roles.value: - role = ctx.guild.get_role(role) - if not role: - continue - message += role.name + "\n" - message += "```" - await ctx.send(message) + + embeds = [] + for roleping in rolepings: + role = ctx.guild.get_role(roleping.role) + bypass_roles = list( + filter( + lambda x: x.id in roleping.bypass["roles"], ctx.guild.roles + ) + ) + bypass_roles = [ + r.mention or "||`[redacted]`||" for r in bypass_roles + ] + bypass_users = [ + ctx.guild.get_member(u).mention or "||`[redacted]`||" + for u in roleping.bypass["users"] + ] + bypass_roles = bypass_roles or ["None"] + bypass_users = bypass_users or ["None"] + embed = build_embed( + title="Roleping", + description=role.mention, + color=str(role.color), + fields=[ + Field( + name="Created At", + value=roleping.created_at.strftime( + "%a, %b %d, %Y %I:%M %p" + ), + inline=False, + ), + Field(name="Active", value=str(roleping.active)), + Field( + name="Bypass Users", + value="\n".join(bypass_users), + ), + Field( + name="Bypass Roles", + value="\n".join(bypass_roles), + ), + ], + ) + + admin = ctx.guild.get_member(roleping.admin) + if not admin: + admin = self.bot.user + + embed.set_author( + name=admin.nick or admin.name, icon_url=admin.avatar_url + ) + embed.set_footer( + text=f"{admin.name}#{admin.discriminator} | {admin.id}" + ) + + embeds.append(embed) + + paginator = Paginator( + bot=self.bot, + ctx=ctx, + embeds=embeds, + only=ctx.author, + timeout=60 * 5, # 5 minute timeout + disable_after_timeout=True, + use_extend=len(embeds) > 2, + left_button_style=ButtonStyle.grey, + right_button_style=ButtonStyle.grey, + basic_buttons=["◀", "▶"], + ) + + self.cache[hash(paginator)] = { + "user": ctx.author.id, + "guild": ctx.guild.id, + "timeout": datetime.utcnow() + timedelta(minutes=5), + "command": ctx.subcommand_name, + "paginator": paginator, + } + + await paginator.start() + + @cog_ext.cog_subcommand( + base="roleping", + subcommand_group="bypass", + name="user", + description="Add a user as a bypass to a roleping", + base_desc="Block roles from being pinged", + sub_group_desc="Allow specific users/roles to ping rolepings", + options=[ + create_option( + name="user", + description="User to add", + option_type=6, + required=True, + ), + create_option( + name="rping", + description="Rolepinged role", + option_type=8, + required=True, + ), + ], + ) + @admin_or_permissions(manage_server=True) + async def _roleping_bypass_user( + self, ctx: SlashContext, user: Member, rping: Role + ): + roleping = Roleping.get(guild=ctx.guild.id, role=rping.id) + if not roleping: + await ctx.send( + f"Roleping not configured for {rping.mention}", hidden=True + ) + return + + if user.id in roleping.bypass["users"]: + await ctx.send(f"{user.mention} already in bypass", hidden=True) + return + + if len(roleping.bypass["users"]) == 10: + await ctx.send( + "Already have 10 users in bypass. " + "Please consider using roles for roleping bypass", + hidden=True, + ) + return + + matching_role = list( + filter(lambda x: x.id in roleping.bypass["roles"], user.roles) + ) + + if matching_role: + await ctx.send( + f"{user.mention} already has bypass " + f"via {matching_role[0].mention}", + hidden=True, + ) + return + + roleping.bypass["users"].append(user.id) + roleping.update() + await ctx.send( + f"{user.nick or user.name} user bypass added for `{rping.name}`" + ) + + @cog_ext.cog_subcommand( + base="roleping", + subcommand_group="bypass", + name="role", + description="Add a role as a bypass to a roleping", + base_desc="Block roles from being pinged", + sub_group_desc="Allow specific users/roles to ping rolepings", + options=[ + create_option( + name="role", + description="Role to add", + option_type=8, + required=True, + ), + create_option( + name="rping", + description="Rolepinged role", + option_type=8, + required=True, + ), + ], + ) + @admin_or_permissions(manage_server=True) + async def _roleping_bypass_role( + self, ctx: SlashContext, role: Role, rping: Role + ): + roleping = Roleping.get(guild=ctx.guild.id, role=rping.id) + if not roleping: + await ctx.send( + f"Roleping not configured for {rping.mention}", hidden=True + ) + return + + if role.id in roleping.bypass["roles"]: + await ctx.send(f"{role.mention} already in bypass", hidden=True) + return + + if len(roleping.bypass["roles"]) == 10: + await ctx.send( + "Already have 10 roles in bypass. " + "Please consider consolidating roles for roleping bypass", + hidden=True, + ) + return + + roleping.bypass["roles"].append(role.id) + roleping.update() + await ctx.send(f"{role.name} role bypass added for `{rping.name}`") + + @cog_ext.cog_subcommand( + base="roleping", + subcommand_group="restore", + name="user", + description="Remove a role bypass", + base_desc="Block roles from being pinged", + sub_group_desc="Remove a bypass from a roleping (restoring it)", + options=[ + create_option( + name="user", + description="User to add", + option_type=6, + required=True, + ), + create_option( + name="rping", + description="Rolepinged role", + option_type=8, + required=True, + ), + ], + ) + @admin_or_permissions(manage_server=True) + async def _roleping_restore_user( + self, ctx: SlashContext, user: Member, rping: Role + ): + roleping = Roleping.get(guild=ctx.guild.id, role=rping.id) + if not roleping: + await ctx.send( + f"Roleping not configured for {rping.mention}", hidden=True + ) + return + + if user.id not in roleping.bypass["users"]: + await ctx.send(f"{user.mention} not in bypass", hidden=True) + return + + roleping.bypass["users"].delete(user.id) + roleping.update() + await ctx.send( + f"{user.nick or user.name} user bypass removed for `{rping.name}`" + ) + + @cog_ext.cog_subcommand( + base="roleping", + subcommand_group="restore", + name="role", + description="Remove a role bypass", + base_desc="Block roles from being pinged", + sub_group_desc="Remove a bypass from a roleping (restoring it)", + options=[ + create_option( + name="role", + description="Role to add", + option_type=8, + required=True, + ), + create_option( + name="rping", + description="Rolepinged role", + option_type=8, + required=True, + ), + ], + ) + @admin_or_permissions(manage_server=True) + async def _roleping_restore_role( + self, ctx: SlashContext, role: Role, rping: Role + ): + roleping = Roleping.get(guild=ctx.guild.id, role=rping.id) + if not roleping: + await ctx.send( + f"Roleping not configured for {rping.mention}", hidden=True + ) + return + + if role.id in roleping.bypass["roles"]: + await ctx.send(f"{role.mention} already in bypass", hidden=True) + return + + if len(roleping.bypass["roles"]) == 10: + await ctx.send( + "Already have 10 roles in bypass. " + "Please consider consolidating roles for roleping bypass", + hidden=True, + ) + return + + roleping.bypass["roles"].append(role.id) + roleping.update() + await ctx.send(f"{role.name} role bypass added for `{rping.name}`") diff --git a/jarvis/db/types.py b/jarvis/db/types.py index d70094e..19ba060 100644 --- a/jarvis/db/types.py +++ b/jarvis/db/types.py @@ -28,6 +28,7 @@ coll_lookup = { "Mute": "mutes", "Purge": "purges", "Reminder": "reminders", + "Roleping": "rolepings", "Setting": "settings", "Starboard": "starboard", "Star": "stars", @@ -460,6 +461,27 @@ class Reminder(MongoObject, ActiveObject): created_at: datetime = field(default_factory=datetime.utcnow) +@dataclass +class Roleping(MongoObject, ActiveObject): + """ + Guild Roleping object + + :param _id: MongoDB ID + :param role: Blocked role + :param guild: ID of origin guild + :param admin: Admin who added roleping + :param bypass: Roles/users who may bypass this roleping + :param active: If the roleping is disabled + :param created_at: Time the roleping was created + """ + + role: int + guild: int + admin: int + bypass: dict + created_at: datetime = field(default_factory=datetime.utcnow) + + @dataclass class Setting(MongoObject): """