diff --git a/jarvis/cogs/admin.py b/jarvis/cogs/admin.py deleted file mode 100644 index 559f04f..0000000 --- a/jarvis/cogs/admin.py +++ /dev/null @@ -1,1316 +0,0 @@ -import re -from datetime import datetime, timedelta -from typing import Union - -import pymongo -from ButtonPaginator import Paginator -from discord import Member, Role, TextChannel, User, VoiceChannel -from discord.ext import commands -from discord.ext.tasks import loop -from discord.utils import find, get -from discord_slash import SlashContext, cog_ext -from discord_slash.model import ButtonStyle -from discord_slash.utils.manage_commands import create_choice, create_option - -import jarvis -from jarvis.db import DBManager -from jarvis.db.types import ( - Autopurge, - Ban, - Kick, - Lock, - MongoSort, - Mute, - Purge, - Setting, - Unban, - Warning, -) -from jarvis.utils import build_embed -from jarvis.utils.field import Field -from jarvis.utils.permissions import admin_or_permissions - - -class AdminCog(commands.Cog): - """ - Guild admin functions - - Used to manage guilds - """ - - def __init__(self, bot: commands.Bot): - self.bot = bot - config = jarvis.config.get_config() - self.db = DBManager(config.mongo).mongo.jarvis - self.cache = {} - self._expire_interaction.start() - - def check_cache(self, ctx: SlashContext, **kwargs): - if not kwargs: - kwargs = {} - return find( - lambda x: x["command"] == ctx.subcommand_name - and x["user"] == ctx.author.id - and x["guild"] == ctx.guild.id - and all(x[k] == v for k, v in kwargs.items()), - self.cache.values(), - ) - - @cog_ext.cog_slash( - name="ban", - description="Ban a user", - options=[ - create_option( - name="user", - description="User to ban", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Ban reason", - required=True, - option_type=3, - ), - create_option( - name="type", - description="Ban type", - option_type=3, - required=False, - choices=[ - create_choice(value="perm", name="Permanent"), - create_choice(value="temp", name="Temporary"), - create_choice(value="soft", name="Soft"), - ], - ), - create_option( - name="duration", - description="Ban duration in hours if temporary", - required=False, - option_type=4, - ), - ], - ) - @admin_or_permissions(ban_members=True) - async def _ban( - self, - ctx: SlashContext, - user: User = None, - reason: str = None, - type: str = "perm", - duration: int = 4, - ): - if not user or user == ctx.author: - await ctx.send("You cannot ban yourself.", hidden=True) - return - if user == self.bot.user: - await ctx.send("I'm afraid I can't let you do that", hidden=True) - return - if type == "temp" and duration < 0: - await ctx.send( - "You cannot set a temp ban to < 0 hours.", hidden=True - ) - return - elif type == "temp" and duration > 744: - await ctx.send( - "You cannot set a temp ban to > 1 month", hidden=True - ) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if not reason: - reason = ( - "Mr. Stark is displeased with your presence. Please leave." - ) - - mtype = type - if mtype == "perm": - mtype = "perma" - - guild_name = ctx.guild.name - user_message = ( - f"You have been {mtype}banned from {guild_name}." - + f" Reason:\n{reason}" - ) - if mtype == "temp": - user_message += f"\nDuration: {duration} hours" - - fields = [Field(name="Type", value=mtype)] - - if mtype == "temp": - fields.append(Field(name="Duration", value=f"{duration} hour(s)")) - - user_embed = build_embed( - title="You have been banned", - description=f"Reason: {reason}", - fields=fields, - ) - - user_embed.set_author( - name=ctx.author.name + "#" + ctx.author.discriminator, - icon_url=ctx.author.avatar_url, - ) - user_embed.set_thumbnail(url=ctx.guild.icon_url) - - try: - await user.send(embed=user_embed) - except Exception: - send_failed = True - try: - await ctx.guild.ban(user, reason=reason) - except Exception as e: - await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True) - return - send_failed = False - if mtype == "soft": - await ctx.guild.unban(user, reason="Ban was softban") - - fields.append(Field(name="DM Sent?", value=str(not send_failed))) - - admin_embed = build_embed( - title="User Banned", - description=f"Reason: {reason}", - fields=fields, - ) - - admin_embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - admin_embed.set_thumbnail(url=user.avatar_url) - admin_embed.set_footer( - text=f"{user.name}#{user.discriminator} | {user.id}" - ) - - await ctx.send(embed=admin_embed) - if type != "temp": - duration = None - active = True - if type == "soft": - active = False - - _ = Ban( - user=user.id, - username=user.name, - discrim=user.discriminator, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - type=type, - duration=duration, - active=active, - ).insert() - - async def discord_apply_unban( - self, ctx: SlashContext, user: User, reason: str - ): - await ctx.guild.unban(user, reason=reason) - _ = Unban( - user=user.id, - username=user.name, - discrim=user.discriminator, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - ).insert() - - embed = build_embed( - title="User Unbanned", - description=f"<@{user.id}> was unbanned", - fields=[Field(name="Reason", value=reason)], - ) - embed.set_author( - name=user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - await ctx.send(embed=embed) - - @cog_ext.cog_slash( - name="unban", - description="Unban a user", - options=[ - create_option( - name="user", - description="User to unban", - option_type=3, - required=True, - ), - create_option( - name="reason", - description="Unban reason", - required=True, - option_type=3, - ), - ], - ) - @admin_or_permissions(ban_members=True) - async def _unban( - self, - ctx: SlashContext, - user: str, - reason: str, - ): - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - - orig_user = user - discrim = None - discord_ban_info = None - database_ban_info = None - - bans = await ctx.guild.bans() - - # Try to get ban information out of Discord - if re.match("^[0-9]{1,}$", user): # User ID - user = int(user) - discord_ban_info = find(lambda x: x.user.id == user, bans) - else: # User name - if re.match("#[0-9]{4}$", user): # User name has discrim - user, discrim = user.split("#") - if discrim: - discord_ban_info = find( - lambda x: x.user.name == user - and x.user.discriminator == discrim, - bans, - ) - else: - results = [ - x for x in filter(lambda x: x.user.name == user, bans) - ] - if results: - if len(results) > 1: - active_bans = [] - for ban in bans: - active_bans.append( - "{0} ({1}): {2}".format( - ban.user.name, ban.user.id, ban.reason - ) - ) - message = ( - "More than one result. " - + "Please use one of the following IDs:\n```" - + "\n".join(active_bans) - + "\n```" - ) - await ctx.send(message) - return - else: - discord_ban_info = results[0] - - # If we don't have the ban information in Discord, - # try to find the relevant information in the database. - # We take advantage of the previous checks to save CPU cycles - if not discord_ban_info: - if isinstance(user, int): - database_ban_info = Ban.get( - guild=ctx.guild.id, user=user, active=True - ) - else: - search = { - "guild": ctx.guild.id, - "username": user, - "active": True, - } - if discrim: - search["discrim"] = discrim - database_ban_info = Ban.get(**search) - - if not discord_ban_info and not database_ban_info: - await ctx.send(f"Unable to find user {orig_user}", hidden=True) - - elif discord_ban_info: - await self.discord_apply_unban(ctx, discord_ban_info.user, reason) - else: - discord_ban_info = find( - lambda x: x.user.id == database_ban_info["id"], bans - ) - if discord_ban_info: - await self.discord_apply_unban( - ctx, discord_ban_info.user, reason - ) - else: - database_ban_info.active = False - database_ban_info.update() - _ = Unban( - user=database_ban_info.user, - username=database_ban_info.username, - discrim=database_ban_info.discrim, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - ).insert() - await ctx.send( - "Unable to find user in Discord, " - + "but removed entry from database." - ) - - @cog_ext.cog_subcommand( - base="bans", - name="list", - description="List bans", - options=[ - create_option( - name="type", - description="Ban type", - option_type=4, - required=False, - choices=[ - create_choice(value=0, name="All"), - create_choice(value=1, name="Permanent"), - create_choice(value=2, name="Temporary"), - create_choice(value=3, name="Soft"), - ], - ), - create_option( - name="active", - description="Active bans", - option_type=4, - required=False, - choices=[ - create_choice(value=1, name="Yes"), - create_choice(value=0, name="No"), - ], - ), - ], - ) - @admin_or_permissions(ban_members=True) - async def _bans_list( - self, ctx: SlashContext, type: int = 0, active: int = 1 - ): - active = bool(active) - exists = self.check_cache(ctx, type=type, active=active) - if exists: - await ctx.defer(hidden=True) - await ctx.send( - "Please use existing interaction: " - + f"{exists['paginator']._message.jump_url}", - hidden=True, - ) - return - types = [0, "perm", "temp", "soft"] - search = {"guild": ctx.guild.id} - if active: - search["active"] = True - if type > 0: - search["type"] = types[type] - bans = Ban.get_many(**search) - bans.sort(key=lambda x: x.created_at, reverse=True) - db_bans = [] - fields = [] - for ban in bans: - if not ban.username: - user = await self.bot.fetch_user(ban.user) - ban.username = user.name if user else "[deleted user]" - fields.append( - Field( - name=f"Username: {ban.username}#{ban.discrim}", - value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n" - + f"User ID: {ban.user}\n" - + f"Reason: {ban.reason}\n" - + f"Type: {ban.type}\n\u200b", - inline=False, - ) - ) - db_bans.append(ban.user) - if type == 0: - bans = await ctx.guild.bans() - for ban in bans: - if ban.user.id not in db_bans: - fields.append( - Field( - name=f"Username: {ban.user.name}#" - + f"{ban.user.discriminator}", - value="Date: [unknown]\n" - + f"User ID: {ban.user.id}\n" - + f"Reason: {ban.reason}\n" - + "Type: manual\n\u200b", - inline=False, - ) - ) - - pages = [] - title = "Active " if active else "Inactive " - if type > 0: - title += types[type] - if type == 1: - title += "a" - title += "bans" - if len(fields) == 0: - embed = build_embed( - title=title, - description=f"No {'in' if not active else ''}active bans", - fields=[], - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - else: - for i in range(0, len(bans), 5): - embed = build_embed( - title=title, description="", fields=fields[i : i + 5] - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - - paginator = Paginator( - bot=self.bot, - ctx=ctx, - embeds=pages, - only=ctx.author, - timeout=60 * 5, # 5 minute timeout - disable_after_timeout=True, - use_extend=len(pages) > 2, - left_button_style=ButtonStyle.grey, - right_button_style=ButtonStyle.grey, - basic_buttons=["◀", "▶"], - ) - - self.cache[hash(paginator)] = { - "guild": ctx.guild.id, - "user": ctx.author.id, - "timeout": datetime.utcnow() + timedelta(minutes=5), - "command": ctx.subcommand_name, - "type": type, - "active": active, - "paginator": paginator, - } - - await paginator.start() - - @cog_ext.cog_slash( - name="kick", - description="Kick a user", - options=[ - create_option( - name="user", - description="User to kick", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Kick reason", - required=False, - option_type=3, - ), - ], - ) - @admin_or_permissions(kick_members=True) - async def _kick(self, ctx: SlashContext, user: User, reason=None): - if not user or user == ctx.author: - await ctx.send("You cannot kick yourself.", hidden=True) - return - if user == self.bot.user: - await ctx.send("I'm afraid I can't let you do that", hidden=True) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if not reason: - reason = ( - "Mr. Stark is displeased with your presence. Please leave." - ) - guild_name = ctx.guild.name - embed = build_embed( - title=f"You have been kicked from {guild_name}", - description=f"Reason: {reason}", - fields=[], - ) - - embed.set_author( - name=ctx.author.name + "#" + ctx.author.discriminator, - icon_url=ctx.author.avatar_url, - ) - embed.set_thumbnail(ctx.guild.icon_url) - - send_failed = False - try: - await user.send(embed=embed) - except Exception: - send_failed = True - await ctx.guild.kick(user, reason=reason) - - fields = [Field(name="DM Sent?", value=str(not send_failed))] - embed = build_embed( - title="User Kicked", - description=f"Reason: {reason}", - fields=fields, - ) - - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - - await ctx.send(embed=embed) - _ = Kick( - user=user.id, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - ).insert() - - @cog_ext.cog_slash( - name="purge", - description="Purge messages from channel", - options=[ - create_option( - name="amount", - description="Amount of messages to purge", - required=False, - option_type=4, - ) - ], - ) - @admin_or_permissions(manage_messages=True) - async def _purge(self, ctx: SlashContext, amount: int = 10): - if amount < 1: - await ctx.send("Amount must be >= 1", hidden=True) - return - await ctx.defer() - channel = ctx.channel - messages = [] - async for message in channel.history(limit=amount + 1): - messages.append(message) - await channel.delete_messages(messages) - _ = Purge( - channel=ctx.channel.id, - guild=ctx.guild.id, - admin=ctx.author.id, - count=amount, - ).insert() - - @cog_ext.cog_slash( - name="mute", - description="Mute a user", - options=[ - create_option( - name="user", - description="User to mute", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Reason for mute", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Mute duration", - option_type=4, - required=False, - ), - ], - ) - @admin_or_permissions(mute_members=True) - async def _mute( - self, ctx: SlashContext, user: Member, reason: str, duration: int = 30 - ): - if user == ctx.author: - await ctx.send("You cannot mute yourself.", hidden=True) - return - if user == self.bot.user: - await ctx.send("I'm afraid I can't let you do that", hidden=True) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") - if not mute_setting: - await ctx.send( - "Please configure a mute role " - + "with /settings mute first", - hidden=True, - ) - return - role = get(ctx.guild.roles, id=mute_setting.value) - if role in user.roles: - await ctx.send("User already muted", hidden=True) - return - await user.add_roles(role, reason=reason) - if duration < 0 or duration > 300: - duration = -1 - _ = Mute( - user=user.id, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - duration=duration, - active=True if duration >= 0 else False, - ).insert() - - embed = build_embed( - title="User Muted", - description=f"{user.mention} has been muted", - fields=[Field(name="Reason", value=reason)], - ) - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - await ctx.send(embed=embed) - - @cog_ext.cog_slash( - name="unmute", - description="Unmute a user", - options=[ - create_option( - name="user", - description="User to unmute", - option_type=6, - required=True, - ) - ], - ) - @admin_or_permissions(mute_members=True) - async def _unmute(self, ctx: SlashContext, user: Member): - mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") - if not mute_setting: - await ctx.send( - "Please configure a mute role with " - + "/settings mute first.", - hidden=True, - ) - return - - role = get(ctx.guild.roles, id=mute_setting.value) - if role in user.roles: - await user.remove_roles(role, reason="Unmute") - else: - await ctx.send("User is not muted.", hidden=True) - return - - mutes = Mute.get_many(guild=ctx.guild.id, user=user.id) - for mute in mutes: - mute.active = False - mute.update() - embed = build_embed( - title="User Unmwaruted", - description=f"{user.mention} has been unmuted", - fields=[], - ) - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - await ctx.send(embed=embed) - - async def _lock_channel( - self, - channel: Union[TextChannel, VoiceChannel], - role: Role, - admin: User, - reason: str, - allow_send=False, - ): - overrides = channel.overwrites_for(role) - if isinstance(channel, TextChannel): - overrides.send_messages = allow_send - elif isinstance(channel, VoiceChannel): - overrides.speak = allow_send - await channel.set_permissions(role, overwrite=overrides, reason=reason) - - async def _unlock_channel( - self, - channel: Union[TextChannel, VoiceChannel], - role: Role, - admin: User, - ): - overrides = channel.overwrites_for(role) - if isinstance(channel, TextChannel): - overrides.send_messages = None - elif isinstance(channel, VoiceChannel): - overrides.speak = None - await channel.set_permissions(role, overwrite=overrides) - - @cog_ext.cog_slash( - name="lock", - description="Locks a channel", - options=[ - create_option( - name="reason", - description="Lock Reason", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Lock duration in minutes (default 10)", - option_type=4, - required=False, - ), - create_option( - name="channel", - description="Channel to lock", - option_type=7, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _lock( - self, - ctx: SlashContext, - reason: str, - duration: int = 10, - channel: Union[TextChannel, VoiceChannel] = None, - ): - await ctx.defer(hidden=True) - if duration <= 0: - await ctx.send("Duration must be > 0", hidden=True) - return - elif duration >= 300: - await ctx.send("Duration must be < 5 hours", hidden=True) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if not channel: - channel = ctx.channel - for role in ctx.guild.roles: - try: - await self._lock_channel(channel, role, ctx.author, reason) - except Exception: - continue # Just continue on error - _ = Lock( - channel=channel.id, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - duration=duration, - ).insert() - await ctx.send(f"{channel.mention} locked for {duration} minute(s)") - - @cog_ext.cog_slash( - name="unlock", - description="Unlocks a channel", - options=[ - create_option( - name="channel", - description="Channel to lock", - option_type=7, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _unlock( - self, - ctx: SlashContext, - channel: Union[TextChannel, VoiceChannel] = None, - ): - if not channel: - channel = ctx.channel - lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True) - if not lock: - await ctx.send(f"{channel.mention} not locked.", hidden=True) - return - for role in ctx.guild.roles: - try: - await self._unlock_channel(channel, role, ctx.author) - except Exception: - continue # Just continue on error - lock.active = False - lock.update() - await ctx.send(f"{channel.mention} unlocked") - - @cog_ext.cog_subcommand( - base="lockdown", - name="start", - description="Locks a server", - options=[ - create_option( - name="reason", - description="Lockdown Reason", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Lockdown duration in minutes (default 10)", - option_type=4, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _lockdown_start( - self, - ctx: SlashContext, - reason: str, - duration: int = 10, - ): - await ctx.defer(hidden=True) - if duration <= 0: - await ctx.send("Duration must be > 0", hidden=True) - return - elif duration >= 300: - await ctx.send("Duration must be < 5 hours", hidden=True) - return - channels = ctx.guild.channels - roles = ctx.guild.roles - updates = [] - for channel in channels: - for role in roles: - try: - await self._lock_channel(channel, role, ctx.author, reason) - except Exception: - continue # Just continue on error - updates.append( - pymongo.InsertOne( - { - "channel": channel.id, - "guild": ctx.guild.id, - "admin": ctx.author.id, - "reason": reason, - "duration": duration, - "active": True, - "created_at": datetime.utcnow(), - } - ) - ) - if updates: - self.db.locks.bulk_write(updates) - await ctx.send(f"Server locked for {duration} minute(s)") - - @cog_ext.cog_subcommand( - base="lockdown", - name="end", - description="Unlocks a server", - ) - @commands.has_permissions(administrator=True) - async def _lockdown_end( - self, - ctx: SlashContext, - ): - channels = ctx.guild.channels - roles = ctx.guild.roles - updates = [] - locks = Lock.get_many(guild=ctx.guild.id, active=True) - if not locks: - await ctx.send("No lockdown detected.", hidden=True) - return - await ctx.defer() - for channel in channels: - for role in roles: - try: - await self._unlock_channel(channel, role, ctx.author) - except Exception: - continue # Just continue on error - updates.append( - pymongo.UpdateOne( - { - "channel": channel.id, - "guild": ctx.guild.id, - "admin": ctx.author.id, - }, - {"$set": {"active": False}}, - ) - ) - if updates: - self.db.locks.bulk_write(updates) - await ctx.send("Server unlocked") - - @cog_ext.cog_slash( - name="warn", - description="Warn a user", - options=[ - create_option( - name="user", - description="User to warn", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Reason for warning", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Duration of warning in hours, default 24", - option_type=4, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _warn( - self, ctx: SlashContext, user: User, reason: str, duration: int = 24 - ): - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if duration <= 0: - await ctx.send("Duration must be > 0", hidden=True) - return - elif duration >= 120: - await ctx.send("Duration must be < 5 days", hidden=True) - return - await ctx.defer() - _ = Warning( - user=user.id, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - duration=duration, - active=True, - ).insert() - fields = [Field("Reason", reason, False)] - embed = build_embed( - title="Warning", - description=f"{user.mention} has been warned", - fields=fields, - ) - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - - await ctx.send(embed=embed) - - @cog_ext.cog_slash( - name="warnings", - description="Get count of user warnings", - options=[ - create_option( - name="user", - description="User to view", - option_type=6, - required=True, - ), - create_option( - name="active", - description="View only active", - option_type=4, - required=False, - choices=[ - create_choice(name="Yes", value=1), - create_choice(name="No", value=0), - ], - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1): - active = bool(active) - exists = self.check_cache(ctx, user_id=user.id, active=active) - if exists: - await ctx.defer(hidden=True) - await ctx.send( - "Please use existing interaction: " - + f"{exists['paginator']._message.jump_url}", - hidden=True, - ) - return - warnings = Warning.get_many( - user=user.id, - guild=ctx.guild.id, - sort=MongoSort(direction="desc", key="created_at"), - ) - active_warns = list(filter(lambda x: x.active, warnings)) - - pages = [] - if active: - if len(active_warns) == 0: - embed = build_embed( - title="Warnings", - description=f"{len(warnings)} total | 0 currently active", - fields=[], - ) - embed.set_author(name=user.name, icon_url=user.avatar_url) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - else: - fields = [] - for warn in active_warns: - admin = ctx.guild.get(warn.admin) - admin_name = "||`[redacted]`||" - if admin: - admin_name = f"{admin.name}#{admin.discriminator}" - fields.append( - Field( - name=warn.created_at.strftime( - "%Y-%m-%d %H:%M:%S UTC" - ), - value=f"{warn.reason}\n" - + f"Admin: {admin_name}\n" - + "\u200b", - inline=False, - ) - ) - for i in range(0, len(fields), 5): - embed = build_embed( - title="Warnings", - description=f"{len(warnings)} total | " - + f"{len(active_warns)} currently active", - fields=fields[i : i + 5], - ) - embed.set_author( - name=user.name + "#" + user.discriminator, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - embed.set_footer( - text=f"{user.name}#{user.discriminator} | {user.id}" - ) - pages.append(embed) - else: - fields = [] - for warn in warnings: - title = "[A] " if warn.active else "[I] " - title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC") - fields.append( - Field( - name=title, - value=warn.reason + "\n\u200b", - inline=False, - ) - ) - for i in range(0, len(fields), 5): - embed = build_embed( - title="Warnings", - description=f"{len(warnings)} total | " - + f"{len(active_warns)} currently active", - fields=fields[i : i + 5], - ) - embed.set_author( - name=user.name + "#" + user.discriminator, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - - paginator = Paginator( - bot=self.bot, - ctx=ctx, - embeds=pages, - only=ctx.author, - timeout=60 * 5, # 5 minute timeout - disable_after_timeout=True, - use_extend=len(pages) > 2, - left_button_style=ButtonStyle.grey, - right_button_style=ButtonStyle.grey, - basic_buttons=["◀", "▶"], - ) - - self.cache[hash(paginator)] = { - "guild": ctx.guild.id, - "user": ctx.author.id, - "timeout": datetime.utcnow() + timedelta(minutes=5), - "command": ctx.subcommand_name, - "user_id": user.id, - "active": active, - "paginator": paginator, - } - - await paginator.start() - - @cog_ext.cog_subcommand( - base="roleping", - name="block", - description="Add a role to the roleping blocklist", - options=[ - create_option( - name="role", - description="Role to add to blocklist", - option_type=8, - required=True, - ) - ], - ) - @commands.has_permissions(administrator=True) - async def _roleping_block(self, ctx: SlashContext, role: Role): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - roles = Setting(guild=ctx.guild.id, setting="roleping", value=[]) - - if role.id in roles.value: - await ctx.send( - f"Role `{role.name}` already in blocklist.", hidden=True - ) - return - roles.value.append(role.id) - roles.update() - await ctx.send(f"Role `{role.name}` added to blocklist.") - - @cog_ext.cog_subcommand( - base="roleping", - name="allow", - description="Remove a role from the roleping blocklist", - options=[ - create_option( - name="role", - description="Role to remove from blocklist", - option_type=8, - required=True, - ) - ], - ) - @commands.has_permissions(administrator=True) - async def _roleping_allow(self, ctx: SlashContext, role: Role): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - await ctx.send("No blocklist configured.", hidden=True) - return - - if role.id not in roles.value: - await ctx.send( - f"Role `{role.name}` not in blocklist.", hidden=True - ) - return - roles.value.remove(role.id) - roles.update() - await ctx.send(f"Role `{role.name}` removed blocklist.") - - @cog_ext.cog_subcommand( - base="roleping", - name="list", - description="List all blocklisted roles", - ) - async def _roleping_list(self, ctx: SlashContext): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - await ctx.send("No blocklist configured.", hidden=True) - return - - message = "Blocklisted Roles:\n```\n" - if not roles.value: - await ctx.send("No roles blocklisted.", hidden=True) - return - for role in roles.value: - role = ctx.guild.get_role(role) - if not role: - continue - message += role.name + "\n" - message += "```" - await ctx.send(message) - - @cog_ext.cog_subcommand( - base="autopurge", - name="add", - description="Automatically purge messages after x seconds", - options=[ - create_option( - name="channel", - description="Channel to autopurge", - option_type=7, - required=True, - ), - create_option( - name="delay", - description="Seconds to keep message before purge, default 30", - option_type=4, - required=False, - ), - ], - ) - @admin_or_permissions(manage_messages=True) - async def _autopurge_add( - self, ctx: SlashContext, channel: TextChannel, delay: int = 30 - ): - if not isinstance(channel, TextChannel): - await ctx.send("Channel must be a TextChannel", hidden=True) - return - if delay <= 0: - await ctx.send("Delay must be > 0", hidden=True) - return - elif delay > 300: - await ctx.send("Delay must be < 5 minutes", hidden=True) - return - autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) - if autopurge: - await ctx.send("Autopurge already exists.", hidden=True) - return - autopurge = Autopurge( - guild=ctx.guild.id, - channel=channel.id, - admin=ctx.author.id, - delay=delay, - ) - autopurge.insert() - await ctx.send( - f"Autopurge set up on {channel.mention}, " - + f"delay is {delay} seconds" - ) - - @cog_ext.cog_subcommand( - base="autopurge", - name="remove", - description="Remove an autopurge", - options=[ - create_option( - name="channel", - description="Channel to remove from autopurge", - option_type=7, - required=True, - ), - ], - ) - @admin_or_permissions(manage_messages=True) - async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel): - autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) - if not autopurge: - await ctx.send("Autopurge does not exist.", hidden=True) - return - autopurge.delete() - await ctx.send(f"Autopurge removed from {channel.mention}.") - - @cog_ext.cog_subcommand( - base="autopurge", - name="update", - description="Update autopurge on a channel", - options=[ - create_option( - name="channel", - description="Channel to update", - option_type=7, - required=True, - ), - create_option( - name="delay", - description="New time to save", - option_type=4, - required=True, - ), - ], - ) - @admin_or_permissions(manage_messages=True) - async def _autopurge_update( - self, ctx: SlashContext, channel: TextChannel, delay: int - ): - autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) - if not autopurge: - await ctx.send("Autopurge does not exist.", hidden=True) - return - autopurge.delay = delay - autopurge.update() - await ctx.send( - f"Autopurge delay updated to {delay} seconds on {channel.mention}." - ) - - @loop(minutes=1) - async def _expire_interaction(self): - keys = list(self.cache.keys()) - for key in keys: - if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta( - minutes=1 - ): - del self.cache[key] - - -def setup(bot): - bot.add_cog(AdminCog(bot)) diff --git a/jarvis/cogs/admin/__init__.py b/jarvis/cogs/admin/__init__.py new file mode 100644 index 0000000..4ab637e --- /dev/null +++ b/jarvis/cogs/admin/__init__.py @@ -0,0 +1,21 @@ +from jarvis.cogs.admin import ( + ban, + kick, + lock, + lockdown, + mute, + purge, + roleping, + warning, +) + + +def setup(bot): + bot.add_cog(ban.BanCog(bot)) + bot.add_cog(kick.KickCog(bot)) + bot.add_cog(lock.LockCog(bot)) + bot.add_cog(lockdown.LockdownCog(bot)) + bot.add_cog(mute.MuteCog(bot)) + bot.add_cog(purge.PurgeCog(bot)) + bot.add_cog(roleping.RolepingCog(bot)) + bot.add_cog(warning.WarningCog(bot)) diff --git a/jarvis/cogs/admin/ban.py b/jarvis/cogs/admin/ban.py new file mode 100644 index 0000000..bf6b7d6 --- /dev/null +++ b/jarvis/cogs/admin/ban.py @@ -0,0 +1,455 @@ +import re +from datetime import datetime, timedelta + +from ButtonPaginator import Paginator +from discord import User +from discord.ext import commands +from discord.utils import find +from discord_slash import SlashContext, cog_ext +from discord_slash.model import ButtonStyle +from discord_slash.utils.manage_commands import create_choice, create_option + +from jarvis.db.types import Ban, Unban +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions + + +class BanCog(CacheCog): + def __init__(self, bot: commands.Bot): + super().__init__(bot) + + async def discord_apply_ban( + self, + ctx: SlashContext, + reason: str, + user: User, + duration: int, + active: bool, + fields: list, + ): + await ctx.guild.ban(user, reason=reason) + _ = Ban( + user=user.id, + username=user.name, + discrim=user.discriminator, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + type=type, + duration=duration, + active=active, + ).insert() + + embed = build_embed( + title="User Banned", + description=f"Reason: {reason}", + fields=fields, + ) + + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + + await ctx.send(embed=embed) + + async def discord_apply_unban( + self, ctx: SlashContext, user: User, reason: str + ): + await ctx.guild.unban(user, reason=reason) + _ = Unban( + user=user.id, + username=user.name, + discrim=user.discriminator, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + ).insert() + + embed = build_embed( + title="User Unbanned", + description=f"<@{user.id}> was unbanned", + fields=[Field(name="Reason", value=reason)], + ) + embed.set_author( + name=user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + await ctx.send(embed=embed) + + @cog_ext.cog_slash( + name="ban", + description="Ban a user", + options=[ + create_option( + name="user", + description="User to ban", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Ban reason", + required=True, + option_type=3, + ), + create_option( + name="type", + description="Ban type", + option_type=3, + required=False, + choices=[ + create_choice(value="perm", name="Permanent"), + create_choice(value="temp", name="Temporary"), + create_choice(value="soft", name="Soft"), + ], + ), + create_option( + name="duration", + description="Ban duration in hours if temporary", + required=False, + option_type=4, + ), + ], + ) + @admin_or_permissions(ban_members=True) + async def _ban( + self, + ctx: SlashContext, + user: User = None, + reason: str = None, + type: str = "perm", + duration: int = 4, + ): + if not user or user == ctx.author: + await ctx.send("You cannot ban yourself.", hidden=True) + return + if user == self.bot.user: + await ctx.send("I'm afraid I can't let you do that", hidden=True) + return + if type == "temp" and duration < 0: + await ctx.send( + "You cannot set a temp ban to < 0 hours.", hidden=True + ) + return + elif type == "temp" and duration > 744: + await ctx.send( + "You cannot set a temp ban to > 1 month", hidden=True + ) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if not reason: + reason = ( + "Mr. Stark is displeased with your presence. Please leave." + ) + + mtype = type + if mtype == "perm": + mtype = "perma" + + guild_name = ctx.guild.name + user_message = ( + f"You have been {mtype}banned from {guild_name}." + + f" Reason:\n{reason}" + ) + if mtype == "temp": + user_message += f"\nDuration: {duration} hours" + + fields = [Field(name="Type", value=mtype)] + + if mtype == "temp": + fields.append(Field(name="Duration", value=f"{duration} hour(s)")) + + user_embed = build_embed( + title="You have been banned", + description=f"Reason: {reason}", + fields=fields, + ) + + user_embed.set_author( + name=ctx.author.name + "#" + ctx.author.discriminator, + icon_url=ctx.author.avatar_url, + ) + user_embed.set_thumbnail(url=ctx.guild.icon_url) + + try: + await user.send(embed=user_embed) + except Exception: + send_failed = True + try: + await ctx.guild.ban(user, reason=reason) + except Exception as e: + await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True) + return + send_failed = False + if mtype == "soft": + await ctx.guild.unban(user, reason="Ban was softban") + + fields.append(Field(name="DM Sent?", value=str(not send_failed))) + if type != "temp": + duration = None + active = True + if type == "soft": + active = False + + self.discord_apply_ban(ctx, reason, user, duration, active, fields) + + @cog_ext.cog_slash( + name="unban", + description="Unban a user", + options=[ + create_option( + name="user", + description="User to unban", + option_type=3, + required=True, + ), + create_option( + name="reason", + description="Unban reason", + required=True, + option_type=3, + ), + ], + ) + @admin_or_permissions(ban_members=True) + async def _unban( + self, + ctx: SlashContext, + user: str, + reason: str, + ): + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + + orig_user = user + discrim = None + discord_ban_info = None + database_ban_info = None + + bans = await ctx.guild.bans() + + # Try to get ban information out of Discord + if re.match("^[0-9]{1,}$", user): # User ID + user = int(user) + discord_ban_info = find(lambda x: x.user.id == user, bans) + else: # User name + if re.match("#[0-9]{4}$", user): # User name has discrim + user, discrim = user.split("#") + if discrim: + discord_ban_info = find( + lambda x: x.user.name == user + and x.user.discriminator == discrim, + bans, + ) + else: + results = [ + x for x in filter(lambda x: x.user.name == user, bans) + ] + if results: + if len(results) > 1: + active_bans = [] + for ban in bans: + active_bans.append( + "{0} ({1}): {2}".format( + ban.user.name, ban.user.id, ban.reason + ) + ) + message = ( + "More than one result. " + + "Please use one of the following IDs:\n```" + + "\n".join(active_bans) + + "\n```" + ) + await ctx.send(message) + return + else: + discord_ban_info = results[0] + + # If we don't have the ban information in Discord, + # try to find the relevant information in the database. + # We take advantage of the previous checks to save CPU cycles + if not discord_ban_info: + if isinstance(user, int): + database_ban_info = Ban.get( + guild=ctx.guild.id, user=user, active=True + ) + else: + search = { + "guild": ctx.guild.id, + "username": user, + "active": True, + } + if discrim: + search["discrim"] = discrim + database_ban_info = Ban.get(**search) + + if not discord_ban_info and not database_ban_info: + await ctx.send(f"Unable to find user {orig_user}", hidden=True) + + elif discord_ban_info: + await self.discord_apply_unban(ctx, discord_ban_info.user, reason) + else: + discord_ban_info = find( + lambda x: x.user.id == database_ban_info["id"], bans + ) + if discord_ban_info: + await self.discord_apply_unban( + ctx, discord_ban_info.user, reason + ) + else: + database_ban_info.active = False + database_ban_info.update() + _ = Unban( + user=database_ban_info.user, + username=database_ban_info.username, + discrim=database_ban_info.discrim, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + ).insert() + await ctx.send( + "Unable to find user in Discord, " + + "but removed entry from database." + ) + + @cog_ext.cog_subcommand( + base="bans", + name="list", + description="List bans", + options=[ + create_option( + name="type", + description="Ban type", + option_type=4, + required=False, + choices=[ + create_choice(value=0, name="All"), + create_choice(value=1, name="Permanent"), + create_choice(value=2, name="Temporary"), + create_choice(value=3, name="Soft"), + ], + ), + create_option( + name="active", + description="Active bans", + option_type=4, + required=False, + choices=[ + create_choice(value=1, name="Yes"), + create_choice(value=0, name="No"), + ], + ), + ], + ) + @admin_or_permissions(ban_members=True) + async def _bans_list( + self, ctx: SlashContext, type: int = 0, active: int = 1 + ): + active = bool(active) + exists = self.check_cache(ctx, type=type, active=active) + if exists: + await ctx.defer(hidden=True) + await ctx.send( + "Please use existing interaction: " + + f"{exists['paginator']._message.jump_url}", + hidden=True, + ) + return + types = [0, "perm", "temp", "soft"] + search = {"guild": ctx.guild.id} + if active: + search["active"] = True + if type > 0: + search["type"] = types[type] + bans = Ban.get_many(**search) + bans.sort(key=lambda x: x.created_at, reverse=True) + db_bans = [] + fields = [] + for ban in bans: + if not ban.username: + user = await self.bot.fetch_user(ban.user) + ban.username = user.name if user else "[deleted user]" + fields.append( + Field( + name=f"Username: {ban.username}#{ban.discrim}", + value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n" + + f"User ID: {ban.user}\n" + + f"Reason: {ban.reason}\n" + + f"Type: {ban.type}\n\u200b", + inline=False, + ) + ) + db_bans.append(ban.user) + if type == 0 and active: + bans = await ctx.guild.bans() + for ban in bans: + if ban.user.id not in db_bans: + fields.append( + Field( + name=f"Username: {ban.user.name}#" + + f"{ban.user.discriminator}", + value="Date: [unknown]\n" + + f"User ID: {ban.user.id}\n" + + f"Reason: {ban.reason}\n" + + "Type: manual\n\u200b", + inline=False, + ) + ) + + pages = [] + title = "Active " if active else "Inactive " + if type > 0: + title += types[type] + if type == 1: + title += "a" + title += "bans" + if len(fields) == 0: + embed = build_embed( + title=title, + description=f"No {'in' if not active else ''}active bans", + fields=[], + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + else: + for i in range(0, len(bans), 5): + embed = build_embed( + title=title, description="", fields=fields[i : i + 5] + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + + paginator = Paginator( + bot=self.bot, + ctx=ctx, + embeds=pages, + only=ctx.author, + timeout=60 * 5, # 5 minute timeout + disable_after_timeout=True, + use_extend=len(pages) > 2, + left_button_style=ButtonStyle.grey, + right_button_style=ButtonStyle.grey, + basic_buttons=["◀", "▶"], + ) + + self.cache[hash(paginator)] = { + "guild": ctx.guild.id, + "user": ctx.author.id, + "timeout": datetime.utcnow() + timedelta(minutes=5), + "command": ctx.subcommand_name, + "type": type, + "active": active, + "paginator": paginator, + } + + await paginator.start() diff --git a/jarvis/cogs/admin/kick.py b/jarvis/cogs/admin/kick.py new file mode 100644 index 0000000..32184d2 --- /dev/null +++ b/jarvis/cogs/admin/kick.py @@ -0,0 +1,88 @@ +from discord import User +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Kick +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions + + +class KickCog(CacheCog): + def __init__(self, bot): + super().__init__(bot) + + +@cog_ext.cog_slash( + name="kick", + description="Kick a user", + options=[ + create_option( + name="user", + description="User to kick", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Kick reason", + required=False, + option_type=3, + ), + ], +) +@admin_or_permissions(kick_members=True) +async def _kick(self, ctx: SlashContext, user: User, reason=None): + if not user or user == ctx.author: + await ctx.send("You cannot kick yourself.", hidden=True) + return + if user == self.bot.user: + await ctx.send("I'm afraid I can't let you do that", hidden=True) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if not reason: + reason = "Mr. Stark is displeased with your presence. Please leave." + guild_name = ctx.guild.name + embed = build_embed( + title=f"You have been kicked from {guild_name}", + description=f"Reason: {reason}", + fields=[], + ) + + embed.set_author( + name=ctx.author.name + "#" + ctx.author.discriminator, + icon_url=ctx.author.avatar_url, + ) + embed.set_thumbnail(ctx.guild.icon_url) + + send_failed = False + try: + await user.send(embed=embed) + except Exception: + send_failed = True + await ctx.guild.kick(user, reason=reason) + + fields = [Field(name="DM Sent?", value=str(not send_failed))] + embed = build_embed( + title="User Kicked", + description=f"Reason: {reason}", + fields=fields, + ) + + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + + await ctx.send(embed=embed) + _ = Kick( + user=user.id, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + ).insert() diff --git a/jarvis/cogs/admin/lock.py b/jarvis/cogs/admin/lock.py new file mode 100644 index 0000000..1ee896b --- /dev/null +++ b/jarvis/cogs/admin/lock.py @@ -0,0 +1,133 @@ +from typing import Union + +from discord import Role, TextChannel, User, VoiceChannel +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Lock +from jarvis.utils.cachecog import CacheCog + + +class LockCog(CacheCog): + def __init__(self, bot: commands.Bot): + super().__init__(bot) + + async def _lock_channel( + self, + channel: Union[TextChannel, VoiceChannel], + role: Role, + admin: User, + reason: str, + allow_send=False, + ): + overrides = channel.overwrites_for(role) + if isinstance(channel, TextChannel): + overrides.send_messages = allow_send + elif isinstance(channel, VoiceChannel): + overrides.speak = allow_send + await channel.set_permissions(role, overwrite=overrides, reason=reason) + + async def _unlock_channel( + self, + channel: Union[TextChannel, VoiceChannel], + role: Role, + admin: User, + ): + overrides = channel.overwrites_for(role) + if isinstance(channel, TextChannel): + overrides.send_messages = None + elif isinstance(channel, VoiceChannel): + overrides.speak = None + await channel.set_permissions(role, overwrite=overrides) + + @cog_ext.cog_slash( + name="lock", + description="Locks a channel", + options=[ + create_option( + name="reason", + description="Lock Reason", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Lock duration in minutes (default 10)", + option_type=4, + required=False, + ), + create_option( + name="channel", + description="Channel to lock", + option_type=7, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _lock( + self, + ctx: SlashContext, + reason: str, + duration: int = 10, + channel: Union[TextChannel, VoiceChannel] = None, + ): + await ctx.defer(hidden=True) + if duration <= 0: + await ctx.send("Duration must be > 0", hidden=True) + return + elif duration >= 300: + await ctx.send("Duration must be < 5 hours", hidden=True) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if not channel: + channel = ctx.channel + for role in ctx.guild.roles: + try: + await self._lock_channel(channel, role, ctx.author, reason) + except Exception: + continue # Just continue on error + _ = Lock( + channel=channel.id, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + duration=duration, + ).insert() + await ctx.send(f"{channel.mention} locked for {duration} minute(s)") + + @cog_ext.cog_slash( + name="unlock", + description="Unlocks a channel", + options=[ + create_option( + name="channel", + description="Channel to lock", + option_type=7, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _unlock( + self, + ctx: SlashContext, + channel: Union[TextChannel, VoiceChannel] = None, + ): + if not channel: + channel = ctx.channel + lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True) + if not lock: + await ctx.send(f"{channel.mention} not locked.", hidden=True) + return + for role in ctx.guild.roles: + try: + await self._unlock_channel(channel, role, ctx.author) + except Exception: + continue # Just continue on error + lock.active = False + lock.update() + await ctx.send(f"{channel.mention} unlocked") diff --git a/jarvis/cogs/admin/lockdown.py b/jarvis/cogs/admin/lockdown.py new file mode 100644 index 0000000..c023039 --- /dev/null +++ b/jarvis/cogs/admin/lockdown.py @@ -0,0 +1,114 @@ +from datetime import datetime + +import pymongo +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.config import get_config +from jarvis.db import DBManager +from jarvis.db.types import Lock +from jarvis.utils.cachecog import CacheCog + + +class LockdownCog(CacheCog): + def __init__(self, bot: commands.Bot): + super().__init__(bot) + self.db = DBManager(get_config().mongo).mongo.jarvis + + @cog_ext.cog_subcommand( + base="lockdown", + name="start", + description="Locks a server", + options=[ + create_option( + name="reason", + description="Lockdown Reason", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Lockdown duration in minutes (default 10)", + option_type=4, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _lockdown_start( + self, + ctx: SlashContext, + reason: str, + duration: int = 10, + ): + await ctx.defer(hidden=True) + if duration <= 0: + await ctx.send("Duration must be > 0", hidden=True) + return + elif duration >= 300: + await ctx.send("Duration must be < 5 hours", hidden=True) + return + channels = ctx.guild.channels + roles = ctx.guild.roles + updates = [] + for channel in channels: + for role in roles: + try: + await self._lock_channel(channel, role, ctx.author, reason) + except Exception: + continue # Just continue on error + updates.append( + pymongo.InsertOne( + { + "channel": channel.id, + "guild": ctx.guild.id, + "admin": ctx.author.id, + "reason": reason, + "duration": duration, + "active": True, + "created_at": datetime.utcnow(), + } + ) + ) + if updates: + self.db.locks.bulk_write(updates) + await ctx.send(f"Server locked for {duration} minute(s)") + + @cog_ext.cog_subcommand( + base="lockdown", + name="end", + description="Unlocks a server", + ) + @commands.has_permissions(administrator=True) + async def _lockdown_end( + self, + ctx: SlashContext, + ): + channels = ctx.guild.channels + roles = ctx.guild.roles + updates = [] + locks = Lock.get_many(guild=ctx.guild.id, active=True) + if not locks: + await ctx.send("No lockdown detected.", hidden=True) + return + await ctx.defer() + for channel in channels: + for role in roles: + try: + await self._unlock_channel(channel, role, ctx.author) + except Exception: + continue # Just continue on error + updates.append( + pymongo.UpdateOne( + { + "channel": channel.id, + "guild": ctx.guild.id, + "admin": ctx.author.id, + }, + {"$set": {"active": False}}, + ) + ) + if updates: + self.db.locks.bulk_write(updates) + await ctx.send("Server unlocked") diff --git a/jarvis/cogs/admin/mute.py b/jarvis/cogs/admin/mute.py new file mode 100644 index 0000000..9a4361c --- /dev/null +++ b/jarvis/cogs/admin/mute.py @@ -0,0 +1,136 @@ +from discord import Member +from discord.ext import commands +from discord.utils import get +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Mute, Setting +from jarvis.utils import build_embed +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions + + +class MuteCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @cog_ext.cog_slash( + name="mute", + description="Mute a user", + options=[ + create_option( + name="user", + description="User to mute", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Reason for mute", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Mute duration", + option_type=4, + required=False, + ), + ], + ) + @admin_or_permissions(mute_members=True) + async def _mute( + self, ctx: SlashContext, user: Member, reason: str, duration: int = 30 + ): + if user == ctx.author: + await ctx.send("You cannot mute yourself.", hidden=True) + return + if user == self.bot.user: + await ctx.send("I'm afraid I can't let you do that", hidden=True) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") + if not mute_setting: + await ctx.send( + "Please configure a mute role " + + "with /settings mute first", + hidden=True, + ) + return + role = get(ctx.guild.roles, id=mute_setting.value) + if role in user.roles: + await ctx.send("User already muted", hidden=True) + return + await user.add_roles(role, reason=reason) + if duration < 0 or duration > 300: + duration = -1 + _ = Mute( + user=user.id, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + duration=duration, + active=True if duration >= 0 else False, + ).insert() + + embed = build_embed( + title="User Muted", + description=f"{user.mention} has been muted", + fields=[Field(name="Reason", value=reason)], + ) + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + await ctx.send(embed=embed) + + @cog_ext.cog_slash( + name="unmute", + description="Unmute a user", + options=[ + create_option( + name="user", + description="User to unmute", + option_type=6, + required=True, + ) + ], + ) + @admin_or_permissions(mute_members=True) + async def _unmute(self, ctx: SlashContext, user: Member): + mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") + if not mute_setting: + await ctx.send( + "Please configure a mute role with " + + "/settings mute first.", + hidden=True, + ) + return + + role = get(ctx.guild.roles, id=mute_setting.value) + if role in user.roles: + await user.remove_roles(role, reason="Unmute") + else: + await ctx.send("User is not muted.", hidden=True) + return + + mutes = Mute.get_many(guild=ctx.guild.id, user=user.id) + for mute in mutes: + mute.active = False + mute.update() + embed = build_embed( + title="User Unmwaruted", + description=f"{user.mention} has been unmuted", + fields=[], + ) + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + await ctx.send(embed=embed) diff --git a/jarvis/cogs/admin/purge.py b/jarvis/cogs/admin/purge.py new file mode 100644 index 0000000..3fc6b30 --- /dev/null +++ b/jarvis/cogs/admin/purge.py @@ -0,0 +1,145 @@ +from discord import TextChannel +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Autopurge, Purge +from jarvis.utils.permissions import admin_or_permissions + + +class PurgeCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @cog_ext.cog_slash( + name="purge", + description="Purge messages from channel", + options=[ + create_option( + name="amount", + description="Amount of messages to purge", + required=False, + option_type=4, + ) + ], + ) + @admin_or_permissions(manage_messages=True) + async def _purge(self, ctx: SlashContext, amount: int = 10): + if amount < 1: + await ctx.send("Amount must be >= 1", hidden=True) + return + await ctx.defer() + channel = ctx.channel + messages = [] + async for message in channel.history(limit=amount + 1): + messages.append(message) + await channel.delete_messages(messages) + _ = Purge( + channel=ctx.channel.id, + guild=ctx.guild.id, + admin=ctx.author.id, + count=amount, + ).insert() + + @cog_ext.cog_subcommand( + base="autopurge", + name="add", + description="Automatically purge messages after x seconds", + options=[ + create_option( + name="channel", + description="Channel to autopurge", + option_type=7, + required=True, + ), + create_option( + name="delay", + description="Seconds to keep message before purge, default 30", + option_type=4, + required=False, + ), + ], + ) + @admin_or_permissions(manage_messages=True) + async def _autopurge_add( + self, ctx: SlashContext, channel: TextChannel, delay: int = 30 + ): + if not isinstance(channel, TextChannel): + await ctx.send("Channel must be a TextChannel", hidden=True) + return + if delay <= 0: + await ctx.send("Delay must be > 0", hidden=True) + return + elif delay > 300: + await ctx.send("Delay must be < 5 minutes", hidden=True) + return + autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) + if autopurge: + await ctx.send("Autopurge already exists.", hidden=True) + return + autopurge = Autopurge( + guild=ctx.guild.id, + channel=channel.id, + admin=ctx.author.id, + delay=delay, + ) + autopurge.insert() + await ctx.send( + f"Autopurge set up on {channel.mention}, " + + f"delay is {delay} seconds" + ) + + @cog_ext.cog_subcommand( + base="autopurge", + name="remove", + description="Remove an autopurge", + options=[ + create_option( + name="channel", + description="Channel to remove from autopurge", + option_type=7, + required=True, + ), + ], + ) + @admin_or_permissions(manage_messages=True) + async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel): + autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) + if not autopurge: + await ctx.send("Autopurge does not exist.", hidden=True) + return + autopurge.delete() + await ctx.send(f"Autopurge removed from {channel.mention}.") + + @cog_ext.cog_subcommand( + base="autopurge", + name="update", + description="Update autopurge on a channel", + options=[ + create_option( + name="channel", + description="Channel to update", + option_type=7, + required=True, + ), + create_option( + name="delay", + description="New time to save", + option_type=4, + required=True, + ), + ], + ) + @admin_or_permissions(manage_messages=True) + async def _autopurge_update( + self, ctx: SlashContext, channel: TextChannel, delay: int + ): + autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) + if not autopurge: + await ctx.send("Autopurge does not exist.", hidden=True) + return + autopurge.delay = delay + autopurge.update() + await ctx.send( + f"Autopurge delay updated to {delay} seconds on {channel.mention}." + ) diff --git a/jarvis/cogs/admin/roleping.py b/jarvis/cogs/admin/roleping.py new file mode 100644 index 0000000..49786a6 --- /dev/null +++ b/jarvis/cogs/admin/roleping.py @@ -0,0 +1,91 @@ +from discord import Role +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Setting + + +class RolepingCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @cog_ext.cog_subcommand( + base="roleping", + name="block", + description="Add a role to the roleping blocklist", + options=[ + create_option( + name="role", + description="Role to add to blocklist", + option_type=8, + required=True, + ) + ], + ) + @commands.has_permissions(administrator=True) + async def _roleping_block(self, ctx: SlashContext, role: Role): + roles = Setting.get(guild=ctx.guild.id, setting="roleping") + if not roles: + roles = Setting(guild=ctx.guild.id, setting="roleping", value=[]) + + if role.id in roles.value: + await ctx.send( + f"Role `{role.name}` already in blocklist.", hidden=True + ) + return + roles.value.append(role.id) + roles.update() + await ctx.send(f"Role `{role.name}` added to blocklist.") + + @cog_ext.cog_subcommand( + base="roleping", + name="allow", + description="Remove a role from the roleping blocklist", + options=[ + create_option( + name="role", + description="Role to remove from blocklist", + option_type=8, + required=True, + ) + ], + ) + @commands.has_permissions(administrator=True) + async def _roleping_allow(self, ctx: SlashContext, role: Role): + roles = Setting.get(guild=ctx.guild.id, setting="roleping") + if not roles: + await ctx.send("No blocklist configured.", hidden=True) + return + + if role.id not in roles.value: + await ctx.send( + f"Role `{role.name}` not in blocklist.", hidden=True + ) + return + roles.value.remove(role.id) + roles.update() + await ctx.send(f"Role `{role.name}` removed blocklist.") + + @cog_ext.cog_subcommand( + base="roleping", + name="list", + description="List all blocklisted roles", + ) + async def _roleping_list(self, ctx: SlashContext): + roles = Setting.get(guild=ctx.guild.id, setting="roleping") + if not roles: + await ctx.send("No blocklist configured.", hidden=True) + return + + message = "Blocklisted Roles:\n```\n" + if not roles.value: + await ctx.send("No roles blocklisted.", hidden=True) + return + for role in roles.value: + role = ctx.guild.get_role(role) + if not role: + continue + message += role.name + "\n" + message += "```" + await ctx.send(message) diff --git a/jarvis/cogs/admin/warning.py b/jarvis/cogs/admin/warning.py new file mode 100644 index 0000000..bd7bd12 --- /dev/null +++ b/jarvis/cogs/admin/warning.py @@ -0,0 +1,215 @@ +from datetime import datetime, timedelta + +from ButtonPaginator import Paginator +from discord import User +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.model import ButtonStyle +from discord_slash.utils.manage_commands import create_choice, create_option + +from jarvis.db.types import MongoSort, Warning +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field + + +class WarningCog(CacheCog): + def __init__(self, bot): + super().__init__(bot) + + @cog_ext.cog_slash( + name="warn", + description="Warn a user", + options=[ + create_option( + name="user", + description="User to warn", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Reason for warning", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Duration of warning in hours, default 24", + option_type=4, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _warn( + self, ctx: SlashContext, user: User, reason: str, duration: int = 24 + ): + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if duration <= 0: + await ctx.send("Duration must be > 0", hidden=True) + return + elif duration >= 120: + await ctx.send("Duration must be < 5 days", hidden=True) + return + await ctx.defer() + _ = Warning( + user=user.id, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + duration=duration, + active=True, + ).insert() + fields = [Field("Reason", reason, False)] + embed = build_embed( + title="Warning", + description=f"{user.mention} has been warned", + fields=fields, + ) + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + + await ctx.send(embed=embed) + + @cog_ext.cog_slash( + name="warnings", + description="Get count of user warnings", + options=[ + create_option( + name="user", + description="User to view", + option_type=6, + required=True, + ), + create_option( + name="active", + description="View only active", + option_type=4, + required=False, + choices=[ + create_choice(name="Yes", value=1), + create_choice(name="No", value=0), + ], + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1): + active = bool(active) + exists = self.check_cache(ctx, user_id=user.id, active=active) + if exists: + await ctx.defer(hidden=True) + await ctx.send( + "Please use existing interaction: " + + f"{exists['paginator']._message.jump_url}", + hidden=True, + ) + return + warnings = Warning.get_many( + user=user.id, + guild=ctx.guild.id, + sort=MongoSort(direction="desc", key="created_at"), + ) + active_warns = list(filter(lambda x: x.active, warnings)) + + pages = [] + if active: + if len(active_warns) == 0: + embed = build_embed( + title="Warnings", + description=f"{len(warnings)} total | 0 currently active", + fields=[], + ) + embed.set_author(name=user.name, icon_url=user.avatar_url) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + else: + fields = [] + for warn in active_warns: + admin = ctx.guild.get(warn.admin) + admin_name = "||`[redacted]`||" + if admin: + admin_name = f"{admin.name}#{admin.discriminator}" + fields.append( + Field( + name=warn.created_at.strftime( + "%Y-%m-%d %H:%M:%S UTC" + ), + value=f"{warn.reason}\n" + + f"Admin: {admin_name}\n" + + "\u200b", + inline=False, + ) + ) + for i in range(0, len(fields), 5): + embed = build_embed( + title="Warnings", + description=f"{len(warnings)} total | " + + f"{len(active_warns)} currently active", + fields=fields[i : i + 5], + ) + embed.set_author( + name=user.name + "#" + user.discriminator, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + embed.set_footer( + text=f"{user.name}#{user.discriminator} | {user.id}" + ) + pages.append(embed) + else: + fields = [] + for warn in warnings: + title = "[A] " if warn.active else "[I] " + title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC") + fields.append( + Field( + name=title, + value=warn.reason + "\n\u200b", + inline=False, + ) + ) + for i in range(0, len(fields), 5): + embed = build_embed( + title="Warnings", + description=f"{len(warnings)} total | " + + f"{len(active_warns)} currently active", + fields=fields[i : i + 5], + ) + embed.set_author( + name=user.name + "#" + user.discriminator, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + + paginator = Paginator( + bot=self.bot, + ctx=ctx, + embeds=pages, + only=ctx.author, + timeout=60 * 5, # 5 minute timeout + disable_after_timeout=True, + use_extend=len(pages) > 2, + left_button_style=ButtonStyle.grey, + right_button_style=ButtonStyle.grey, + basic_buttons=["◀", "▶"], + ) + + self.cache[hash(paginator)] = { + "guild": ctx.guild.id, + "user": ctx.author.id, + "timeout": datetime.utcnow() + timedelta(minutes=5), + "command": ctx.subcommand_name, + "user_id": user.id, + "active": active, + "paginator": paginator, + } + + await paginator.start() diff --git a/jarvis/utils/cachecog.py b/jarvis/utils/cachecog.py new file mode 100644 index 0000000..d5355f8 --- /dev/null +++ b/jarvis/utils/cachecog.py @@ -0,0 +1,33 @@ +from datetime import datetime, timedelta + +from discord.ext import commands +from discord.ext.tasks import loop +from discord.utils import find +from discord_slash import SlashContext + + +class CacheCog(commands.Cog): + def __init__(self, bot: commands.Bot): + self.bot = bot + self.cache = {} + self._expire_interaction.start() + + def check_cache(self, ctx: SlashContext, **kwargs): + if not kwargs: + kwargs = {} + return find( + lambda x: x["command"] == ctx.subcommand_name + and x["user"] == ctx.author.id + and x["guild"] == ctx.guild.id + and all(x[k] == v for k, v in kwargs.items()), + self.cache.values(), + ) + + @loop(minutes=1) + async def _expire_interaction(self): + keys = list(self.cache.keys()) + for key in keys: + if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta( + minutes=1 + ): + del self.cache[key]