diff --git a/README.md b/README.md index c281f99..9f01850 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,108 @@ On top of the above requirements, the following pip packages are also required: - `psutil>=5.8, <6` - `GitPython>=3.1, <4` - `PyYaml>=5.4, <6` -- `discord-py-slash-command>=2.3, <3` +- `discord-py-slash-command>=2.3.2, <3` - `pymongo>=3.12.0, <4` - `opencv-python>=4.5, <5` +- `ButtonPaginator>=0.0.3` +- `Pillow>=8.2.0, <9` +- `python-gitlab>=2.9.0, <3` +- `ulid-py>=1.1.0, <2` + + +## J.A.R.V.I.S. Cogs + +Current cogs that are implemented: + +- `AdminCog` + - Handles all admin commands +- `ModlogCog` + - Handles modlog events +- `AutoreactCog` + - Handles autoreaction configuration +- `CTC2Cog` + - dbrand Complete the Code utilities +- `DbrandCog` + - dbrand-specific functions and utilities +- `DevCog` + - Developer utilities, such as hashing, encoding, and UUID generation +- `ErrorCog` + - Handles all bot errors +- `GitlabCog` + - Shows Gitlab information about J.A.R.V.I.S. +- `ImageCog` + - Image-processing cog. Only cog with no slash commands +- `JokesCog` + - Get a joke, have a laugh +- `OwnerCog` + - For the bot owner. Bot management commands +- `RemindmeCog` + - Manage reminders +- `RolegiverCog` + - Configure selectable roles +- `SettingsCog` + - Manage Guild settings +- `StarboardCog` + - Configure and add starboards and stars +- `UtilCog` + - Generic utilities, like userinfo and roleinfo +- `VerifyCog` + - Guild verification + + +## Directories + +### `jarvis` + +The bot itself + +#### `jarvis.cogs` + +All of the cogs listed above are stored in this directory + +##### `jarvis.cogs.admin` + +Contains all AdminCogs, including: +- `BanCog` +- `KickCog` +- `LockCog` +- `LockdownCog` +- `MuteCog` +- `PurgeCog` +- `RolepingCog` +- `WarningCog` + +##### `jarvis.cogs.modlog` + +Contains all ModlogCogs, including: +- `ModlogCommandCog` +- `ModlogMemberCog` +- `ModlogMessageCog` + +`jarvis.cogs.modlog.utils` includes modlog-specific utilities + +#### `jarvis.data` + +Contains data relevant to J.A.R.V.I.S., such as emoji lookups and dbrand data + +##### `jarvis.data.json` + +Any JSON files that are needed are stored here + +#### `jarvis.db` + +All database-related files. + +`jarvis.db.types` handles almost all of the database conections + +#### `jarvis.events` + +Containers for `@on_` d.py events + +#### `jarvis.tasks` + +All background tasks run from this folder + +#### `jarvis.utils` + +Generic utilities diff --git a/jarvis/__init__.py b/jarvis/__init__.py index 31ed6ad..7feedcc 100644 --- a/jarvis/__init__.py +++ b/jarvis/__init__.py @@ -1,30 +1,15 @@ import asyncio -import re -from datetime import datetime, timedelta from pathlib import Path -import pymongo -from discord import DMChannel, Intents, Member, Message +from discord import Intents from discord.ext import commands -from discord.ext.tasks import loop from discord.utils import find from discord_slash import SlashCommand from psutil import Process -from jarvis import logo, utils +from jarvis import logo, tasks, utils from jarvis.config import get_config from jarvis.db import DBManager -from jarvis.db.types import ( - Autopurge, - Autoreact, - Ban, - Lock, - Mute, - Setting, - Warning, -) -from jarvis.utils import build_embed -from jarvis.utils.field import Field if asyncio.get_event_loop().is_closed(): asyncio.set_event_loop(asyncio.new_event_loop()) @@ -33,18 +18,13 @@ intents = Intents.default() intents.members = True restart_ctx = None -invites = re.compile( - r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)", - flags=re.IGNORECASE, -) - jarvis = commands.Bot( command_prefix=utils.get_prefix, intents=intents, help_command=None ) slash = SlashCommand(jarvis, sync_commands=True, sync_on_cog_reload=True) jarvis_self = Process() -__version__ = "1.6.1" +__version__ = "1.7.0" db = DBManager(get_config().mongo).mongo @@ -74,307 +54,6 @@ async def on_ready(): restart_ctx = None -@jarvis.event -async def on_member_join(user: Member): - guild = user.guild - mutes = Mute.get_active(guild=guild.id) - if mutes and len(mutes) >= 1: - mute_role = Setting.get(guild=guild.id, setting="mute") - role = guild.get_role(mute_role.value) - await user.add_roles( - role, reason="User is muted still muted from prior mute" - ) - unverified = Setting.get(guild=guild.id, setting="unverified") - if unverified: - role = guild.get_role(unverified.value) - await user.add_roles(role, reason="User just joined and is unverified") - - -@jarvis.event -async def on_message(message: Message): - channel = find( - lambda x: x.id == 599068193339736096, message.channel_mentions - ) - if channel and message.author.id == 293795462752894976: - await channel.send( - content="https://cdn.discordapp.com/attachments/" - + "664621130044407838/805218508866453554/tech.gif" - ) - if ( - not isinstance(message.channel, DMChannel) - and message.author.id != jarvis.user.id - ): - autoreact = Autoreact.get( - guild=message.guild.id, - channel=message.channel.id, - ) - if autoreact: - for reaction in autoreact.reactions: - await message.add_reaction(reaction) - massmention = Setting.get( - guild=message.guild.id, - setting="massmention", - ) - if ( - massmention.value > 0 - and len(message.mentions) - - (1 if message.author in message.mentions else 0) - > massmention.value - ): - warning = Warning( - active=True, - admin=get_config().client_id, - duration=24, - guild=message.guild.id, - reason="Mass Mention", - user=message.author.id, - ) - warning.insert() - fields = [Field("Reason", "Mass Mention", False)] - embed = build_embed( - title="Warning", - description=f"{message.author.mention} has been warned", - fields=fields, - ) - embed.set_author( - name=message.author.nick - if message.author.nick - else message.author.name, - icon_url=message.author.avatar_url, - ) - embed.set_footer( - text=f"{message.author.name}#{message.author.discriminator} " - + f"| {message.author.id}" - ) - await message.channel.send(embed=embed) - roleping = Setting.get(guild=message.guild.id, setting="roleping") - roles = [] - for mention in message.role_mentions: - roles.append(mention.id) - for mention in message.mentions: - for role in mention.roles: - roles.append(role.id) - if ( - roleping - and any(x in roleping.value for x in roles) - and not any(x.id in roleping.value for x in message.author.roles) - ): - warning = Warning( - active=True, - admin=get_config().client_id, - duration=24, - guild=message.guild.id, - reason="Pinged a blocked role/user with a blocked role", - user=message.author.id, - ) - warning.insert() - fields = [ - Field( - "Reason", - "Pinged a blocked role/user with a blocked role", - False, - ) - ] - embed = build_embed( - title="Warning", - description=f"{message.author.mention} has been warned", - fields=fields, - ) - embed.set_author( - name=message.author.nick - if message.author.nick - else message.author.name, - icon_url=message.author.avatar_url, - ) - embed.set_footer( - text=f"{message.author.name}#{message.author.discriminator} " - + f"| {message.author.id}" - ) - await message.channel.send(embed=embed) - autopurge = Autopurge.get( - guild=message.guild.id, channel=message.channel.id - ) - if autopurge: - await message.delete(delay=autopurge.delay) - content = re.sub(r"\s+", "", message.content) - match = invites.search(content) - if match: - guild_invites = await message.guild.invites() - allowed = [x.code for x in guild_invites] + [ - "dbrand", - "VtgZntXcnZ", - ] - if match.group(1) not in allowed: - await message.delete() - warning = Warning( - active=True, - admin=get_config().client_id, - duration=24, - guild=message.guild.id, - reason="Sent an invite link", - user=message.author.id, - ) - warning.insert() - fields = [ - Field( - "Reason", - "Sent an invite link", - False, - ) - ] - embed = build_embed( - title="Warning", - description=f"{message.author.mention} has been warned", - fields=fields, - ) - embed.set_author( - name=message.author.nick - if message.author.nick - else message.author.name, - icon_url=message.author.avatar_url, - ) - embed.set_footer( - text=f"{message.author.name}#" - + f"{message.author.discriminator} " - + f"| {message.author.id}" - ) - await message.channel.send(embed=embed) - await jarvis.process_commands(message) - - -@jarvis.event -async def on_guild_join(guild): - general = find(lambda x: x.name == "general", guild.channels) - if general and general.permissions_for(guild.me).send_messages: - await general.send( - "Allow me to introduce myself. I am J.A.R.V.I.S., a virtual " - + "artificial intelligence, and I'm here to assist you with a " - + "variety of tasks as best I can, " - + "24 hours a day, seven days a week." - ) - await asyncio.sleep(1) - await general.send("Importing all preferences from home interface...") - - # Set some default settings - setting = Setting(guild=guild.id, setting="massmention", value=5) - setting.insert() - - await general.send("Systems are now fully operational") - - -@loop(minutes=1) -async def unmute(): - mutes = Mute.get_active(duration={"$gt": 0}) - mute_roles = Setting.get_many(setting="mute") - updates = [] - for mute in mutes: - if ( - mute.created_at + timedelta(minutes=mute.duration) - < datetime.utcnow() - ): - mute_role = [x.value for x in mute_roles if x.guild == mute.guild][ - 0 - ] - guild = await jarvis.fetch_guild(mute.guild) - role = guild.get_role(mute_role) - user = await guild.fetch_member(mute.user) - if user: - if role in user.roles: - await user.remove_roles(role, reason="Mute expired") - - # Objects can't handle bulk_write, so handle it via raw methods - updates.append( - pymongo.UpdateOne( - { - "user": user.id, - "guild": guild.id, - "created_at": mute.created_at, - }, - {"$set": {"active": False}}, - ) - ) - if updates: - jarvis_db.mutes.bulk_write(updates) - - -@loop(minutes=10) -async def unban(): - bans = Ban.get_active(type="temp") - updates = [] - for ban in bans: - if ban.created_at + timedelta( - hours=ban.duration - ) < datetime.utcnow() + timedelta(minutes=10): - guild = await jarvis.fetch_guild(ban.guild) - user = await jarvis.fetch_user(ban.user) - if user: - guild.unban(user) - updates.append( - pymongo.UpdateOne( - { - "user": user.id, - "guild": guild.id, - "created_at": ban.created_at, - "type": "temp", - }, - {"$set": {"active": False}}, - ) - ) - if updates: - jarvis_db.bans.bulk_write(updates) - - -@loop(minutes=1) -async def unlock(): - locks = Lock.get_active() - updates = [] - for lock in locks: - if ( - lock.created_at + timedelta(minutes=lock.duration) - < datetime.utcnow() - ): - guild = await jarvis.fetch_guild(lock.guild) - channel = await jarvis.fetch_channel(lock.channel) - if channel: - roles = await guild.fetch_roles() - for role in roles: - overrides = channel.overwrites_for(role) - overrides.send_messages = None - await channel.set_permissions( - role, overwrite=overrides, reason="Lock expired" - ) - updates.append( - pymongo.UpdateOne( - { - "channel": channel.id, - "guild": guild.id, - "created_at": lock.created_at, - }, - {"$set": {"active": False}}, - ) - ) - if updates: - jarvis_db.locks.bulk_write(updates) - - -@loop(hours=1) -async def unwarn(): - warns = Warning.get_active() - updates = [] - for warn in warns: - if ( - warn.created_at + timedelta(hours=warn.duration) - < datetime.utcnow() - ): - updates.append( - pymongo.UpdateOne( - {"_id": warn._id}, {"$set": {"active": False}} - ) - ) - if updates: - jarvis_db.warns.bulk_write(updates) - - def run(ctx=None): global restart_ctx if ctx: @@ -388,11 +67,9 @@ def run(ctx=None): config.client_id ) ) - unmute.start() - unban.start() - unlock.start() - unwarn.start() + jarvis.max_messages = config.max_messages + tasks.init() jarvis.run(config.token, bot=True, reconnect=True) for cog in jarvis.cogs: session = getattr(cog, "_session", None) diff --git a/jarvis/cogs/admin.py b/jarvis/cogs/admin.py deleted file mode 100644 index 559f04f..0000000 --- a/jarvis/cogs/admin.py +++ /dev/null @@ -1,1316 +0,0 @@ -import re -from datetime import datetime, timedelta -from typing import Union - -import pymongo -from ButtonPaginator import Paginator -from discord import Member, Role, TextChannel, User, VoiceChannel -from discord.ext import commands -from discord.ext.tasks import loop -from discord.utils import find, get -from discord_slash import SlashContext, cog_ext -from discord_slash.model import ButtonStyle -from discord_slash.utils.manage_commands import create_choice, create_option - -import jarvis -from jarvis.db import DBManager -from jarvis.db.types import ( - Autopurge, - Ban, - Kick, - Lock, - MongoSort, - Mute, - Purge, - Setting, - Unban, - Warning, -) -from jarvis.utils import build_embed -from jarvis.utils.field import Field -from jarvis.utils.permissions import admin_or_permissions - - -class AdminCog(commands.Cog): - """ - Guild admin functions - - Used to manage guilds - """ - - def __init__(self, bot: commands.Bot): - self.bot = bot - config = jarvis.config.get_config() - self.db = DBManager(config.mongo).mongo.jarvis - self.cache = {} - self._expire_interaction.start() - - def check_cache(self, ctx: SlashContext, **kwargs): - if not kwargs: - kwargs = {} - return find( - lambda x: x["command"] == ctx.subcommand_name - and x["user"] == ctx.author.id - and x["guild"] == ctx.guild.id - and all(x[k] == v for k, v in kwargs.items()), - self.cache.values(), - ) - - @cog_ext.cog_slash( - name="ban", - description="Ban a user", - options=[ - create_option( - name="user", - description="User to ban", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Ban reason", - required=True, - option_type=3, - ), - create_option( - name="type", - description="Ban type", - option_type=3, - required=False, - choices=[ - create_choice(value="perm", name="Permanent"), - create_choice(value="temp", name="Temporary"), - create_choice(value="soft", name="Soft"), - ], - ), - create_option( - name="duration", - description="Ban duration in hours if temporary", - required=False, - option_type=4, - ), - ], - ) - @admin_or_permissions(ban_members=True) - async def _ban( - self, - ctx: SlashContext, - user: User = None, - reason: str = None, - type: str = "perm", - duration: int = 4, - ): - if not user or user == ctx.author: - await ctx.send("You cannot ban yourself.", hidden=True) - return - if user == self.bot.user: - await ctx.send("I'm afraid I can't let you do that", hidden=True) - return - if type == "temp" and duration < 0: - await ctx.send( - "You cannot set a temp ban to < 0 hours.", hidden=True - ) - return - elif type == "temp" and duration > 744: - await ctx.send( - "You cannot set a temp ban to > 1 month", hidden=True - ) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if not reason: - reason = ( - "Mr. Stark is displeased with your presence. Please leave." - ) - - mtype = type - if mtype == "perm": - mtype = "perma" - - guild_name = ctx.guild.name - user_message = ( - f"You have been {mtype}banned from {guild_name}." - + f" Reason:\n{reason}" - ) - if mtype == "temp": - user_message += f"\nDuration: {duration} hours" - - fields = [Field(name="Type", value=mtype)] - - if mtype == "temp": - fields.append(Field(name="Duration", value=f"{duration} hour(s)")) - - user_embed = build_embed( - title="You have been banned", - description=f"Reason: {reason}", - fields=fields, - ) - - user_embed.set_author( - name=ctx.author.name + "#" + ctx.author.discriminator, - icon_url=ctx.author.avatar_url, - ) - user_embed.set_thumbnail(url=ctx.guild.icon_url) - - try: - await user.send(embed=user_embed) - except Exception: - send_failed = True - try: - await ctx.guild.ban(user, reason=reason) - except Exception as e: - await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True) - return - send_failed = False - if mtype == "soft": - await ctx.guild.unban(user, reason="Ban was softban") - - fields.append(Field(name="DM Sent?", value=str(not send_failed))) - - admin_embed = build_embed( - title="User Banned", - description=f"Reason: {reason}", - fields=fields, - ) - - admin_embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - admin_embed.set_thumbnail(url=user.avatar_url) - admin_embed.set_footer( - text=f"{user.name}#{user.discriminator} | {user.id}" - ) - - await ctx.send(embed=admin_embed) - if type != "temp": - duration = None - active = True - if type == "soft": - active = False - - _ = Ban( - user=user.id, - username=user.name, - discrim=user.discriminator, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - type=type, - duration=duration, - active=active, - ).insert() - - async def discord_apply_unban( - self, ctx: SlashContext, user: User, reason: str - ): - await ctx.guild.unban(user, reason=reason) - _ = Unban( - user=user.id, - username=user.name, - discrim=user.discriminator, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - ).insert() - - embed = build_embed( - title="User Unbanned", - description=f"<@{user.id}> was unbanned", - fields=[Field(name="Reason", value=reason)], - ) - embed.set_author( - name=user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - await ctx.send(embed=embed) - - @cog_ext.cog_slash( - name="unban", - description="Unban a user", - options=[ - create_option( - name="user", - description="User to unban", - option_type=3, - required=True, - ), - create_option( - name="reason", - description="Unban reason", - required=True, - option_type=3, - ), - ], - ) - @admin_or_permissions(ban_members=True) - async def _unban( - self, - ctx: SlashContext, - user: str, - reason: str, - ): - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - - orig_user = user - discrim = None - discord_ban_info = None - database_ban_info = None - - bans = await ctx.guild.bans() - - # Try to get ban information out of Discord - if re.match("^[0-9]{1,}$", user): # User ID - user = int(user) - discord_ban_info = find(lambda x: x.user.id == user, bans) - else: # User name - if re.match("#[0-9]{4}$", user): # User name has discrim - user, discrim = user.split("#") - if discrim: - discord_ban_info = find( - lambda x: x.user.name == user - and x.user.discriminator == discrim, - bans, - ) - else: - results = [ - x for x in filter(lambda x: x.user.name == user, bans) - ] - if results: - if len(results) > 1: - active_bans = [] - for ban in bans: - active_bans.append( - "{0} ({1}): {2}".format( - ban.user.name, ban.user.id, ban.reason - ) - ) - message = ( - "More than one result. " - + "Please use one of the following IDs:\n```" - + "\n".join(active_bans) - + "\n```" - ) - await ctx.send(message) - return - else: - discord_ban_info = results[0] - - # If we don't have the ban information in Discord, - # try to find the relevant information in the database. - # We take advantage of the previous checks to save CPU cycles - if not discord_ban_info: - if isinstance(user, int): - database_ban_info = Ban.get( - guild=ctx.guild.id, user=user, active=True - ) - else: - search = { - "guild": ctx.guild.id, - "username": user, - "active": True, - } - if discrim: - search["discrim"] = discrim - database_ban_info = Ban.get(**search) - - if not discord_ban_info and not database_ban_info: - await ctx.send(f"Unable to find user {orig_user}", hidden=True) - - elif discord_ban_info: - await self.discord_apply_unban(ctx, discord_ban_info.user, reason) - else: - discord_ban_info = find( - lambda x: x.user.id == database_ban_info["id"], bans - ) - if discord_ban_info: - await self.discord_apply_unban( - ctx, discord_ban_info.user, reason - ) - else: - database_ban_info.active = False - database_ban_info.update() - _ = Unban( - user=database_ban_info.user, - username=database_ban_info.username, - discrim=database_ban_info.discrim, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - ).insert() - await ctx.send( - "Unable to find user in Discord, " - + "but removed entry from database." - ) - - @cog_ext.cog_subcommand( - base="bans", - name="list", - description="List bans", - options=[ - create_option( - name="type", - description="Ban type", - option_type=4, - required=False, - choices=[ - create_choice(value=0, name="All"), - create_choice(value=1, name="Permanent"), - create_choice(value=2, name="Temporary"), - create_choice(value=3, name="Soft"), - ], - ), - create_option( - name="active", - description="Active bans", - option_type=4, - required=False, - choices=[ - create_choice(value=1, name="Yes"), - create_choice(value=0, name="No"), - ], - ), - ], - ) - @admin_or_permissions(ban_members=True) - async def _bans_list( - self, ctx: SlashContext, type: int = 0, active: int = 1 - ): - active = bool(active) - exists = self.check_cache(ctx, type=type, active=active) - if exists: - await ctx.defer(hidden=True) - await ctx.send( - "Please use existing interaction: " - + f"{exists['paginator']._message.jump_url}", - hidden=True, - ) - return - types = [0, "perm", "temp", "soft"] - search = {"guild": ctx.guild.id} - if active: - search["active"] = True - if type > 0: - search["type"] = types[type] - bans = Ban.get_many(**search) - bans.sort(key=lambda x: x.created_at, reverse=True) - db_bans = [] - fields = [] - for ban in bans: - if not ban.username: - user = await self.bot.fetch_user(ban.user) - ban.username = user.name if user else "[deleted user]" - fields.append( - Field( - name=f"Username: {ban.username}#{ban.discrim}", - value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n" - + f"User ID: {ban.user}\n" - + f"Reason: {ban.reason}\n" - + f"Type: {ban.type}\n\u200b", - inline=False, - ) - ) - db_bans.append(ban.user) - if type == 0: - bans = await ctx.guild.bans() - for ban in bans: - if ban.user.id not in db_bans: - fields.append( - Field( - name=f"Username: {ban.user.name}#" - + f"{ban.user.discriminator}", - value="Date: [unknown]\n" - + f"User ID: {ban.user.id}\n" - + f"Reason: {ban.reason}\n" - + "Type: manual\n\u200b", - inline=False, - ) - ) - - pages = [] - title = "Active " if active else "Inactive " - if type > 0: - title += types[type] - if type == 1: - title += "a" - title += "bans" - if len(fields) == 0: - embed = build_embed( - title=title, - description=f"No {'in' if not active else ''}active bans", - fields=[], - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - else: - for i in range(0, len(bans), 5): - embed = build_embed( - title=title, description="", fields=fields[i : i + 5] - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - - paginator = Paginator( - bot=self.bot, - ctx=ctx, - embeds=pages, - only=ctx.author, - timeout=60 * 5, # 5 minute timeout - disable_after_timeout=True, - use_extend=len(pages) > 2, - left_button_style=ButtonStyle.grey, - right_button_style=ButtonStyle.grey, - basic_buttons=["◀", "▶"], - ) - - self.cache[hash(paginator)] = { - "guild": ctx.guild.id, - "user": ctx.author.id, - "timeout": datetime.utcnow() + timedelta(minutes=5), - "command": ctx.subcommand_name, - "type": type, - "active": active, - "paginator": paginator, - } - - await paginator.start() - - @cog_ext.cog_slash( - name="kick", - description="Kick a user", - options=[ - create_option( - name="user", - description="User to kick", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Kick reason", - required=False, - option_type=3, - ), - ], - ) - @admin_or_permissions(kick_members=True) - async def _kick(self, ctx: SlashContext, user: User, reason=None): - if not user or user == ctx.author: - await ctx.send("You cannot kick yourself.", hidden=True) - return - if user == self.bot.user: - await ctx.send("I'm afraid I can't let you do that", hidden=True) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if not reason: - reason = ( - "Mr. Stark is displeased with your presence. Please leave." - ) - guild_name = ctx.guild.name - embed = build_embed( - title=f"You have been kicked from {guild_name}", - description=f"Reason: {reason}", - fields=[], - ) - - embed.set_author( - name=ctx.author.name + "#" + ctx.author.discriminator, - icon_url=ctx.author.avatar_url, - ) - embed.set_thumbnail(ctx.guild.icon_url) - - send_failed = False - try: - await user.send(embed=embed) - except Exception: - send_failed = True - await ctx.guild.kick(user, reason=reason) - - fields = [Field(name="DM Sent?", value=str(not send_failed))] - embed = build_embed( - title="User Kicked", - description=f"Reason: {reason}", - fields=fields, - ) - - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - - await ctx.send(embed=embed) - _ = Kick( - user=user.id, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - ).insert() - - @cog_ext.cog_slash( - name="purge", - description="Purge messages from channel", - options=[ - create_option( - name="amount", - description="Amount of messages to purge", - required=False, - option_type=4, - ) - ], - ) - @admin_or_permissions(manage_messages=True) - async def _purge(self, ctx: SlashContext, amount: int = 10): - if amount < 1: - await ctx.send("Amount must be >= 1", hidden=True) - return - await ctx.defer() - channel = ctx.channel - messages = [] - async for message in channel.history(limit=amount + 1): - messages.append(message) - await channel.delete_messages(messages) - _ = Purge( - channel=ctx.channel.id, - guild=ctx.guild.id, - admin=ctx.author.id, - count=amount, - ).insert() - - @cog_ext.cog_slash( - name="mute", - description="Mute a user", - options=[ - create_option( - name="user", - description="User to mute", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Reason for mute", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Mute duration", - option_type=4, - required=False, - ), - ], - ) - @admin_or_permissions(mute_members=True) - async def _mute( - self, ctx: SlashContext, user: Member, reason: str, duration: int = 30 - ): - if user == ctx.author: - await ctx.send("You cannot mute yourself.", hidden=True) - return - if user == self.bot.user: - await ctx.send("I'm afraid I can't let you do that", hidden=True) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") - if not mute_setting: - await ctx.send( - "Please configure a mute role " - + "with /settings mute first", - hidden=True, - ) - return - role = get(ctx.guild.roles, id=mute_setting.value) - if role in user.roles: - await ctx.send("User already muted", hidden=True) - return - await user.add_roles(role, reason=reason) - if duration < 0 or duration > 300: - duration = -1 - _ = Mute( - user=user.id, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - duration=duration, - active=True if duration >= 0 else False, - ).insert() - - embed = build_embed( - title="User Muted", - description=f"{user.mention} has been muted", - fields=[Field(name="Reason", value=reason)], - ) - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - await ctx.send(embed=embed) - - @cog_ext.cog_slash( - name="unmute", - description="Unmute a user", - options=[ - create_option( - name="user", - description="User to unmute", - option_type=6, - required=True, - ) - ], - ) - @admin_or_permissions(mute_members=True) - async def _unmute(self, ctx: SlashContext, user: Member): - mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") - if not mute_setting: - await ctx.send( - "Please configure a mute role with " - + "/settings mute first.", - hidden=True, - ) - return - - role = get(ctx.guild.roles, id=mute_setting.value) - if role in user.roles: - await user.remove_roles(role, reason="Unmute") - else: - await ctx.send("User is not muted.", hidden=True) - return - - mutes = Mute.get_many(guild=ctx.guild.id, user=user.id) - for mute in mutes: - mute.active = False - mute.update() - embed = build_embed( - title="User Unmwaruted", - description=f"{user.mention} has been unmuted", - fields=[], - ) - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=user.avatar_url) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - await ctx.send(embed=embed) - - async def _lock_channel( - self, - channel: Union[TextChannel, VoiceChannel], - role: Role, - admin: User, - reason: str, - allow_send=False, - ): - overrides = channel.overwrites_for(role) - if isinstance(channel, TextChannel): - overrides.send_messages = allow_send - elif isinstance(channel, VoiceChannel): - overrides.speak = allow_send - await channel.set_permissions(role, overwrite=overrides, reason=reason) - - async def _unlock_channel( - self, - channel: Union[TextChannel, VoiceChannel], - role: Role, - admin: User, - ): - overrides = channel.overwrites_for(role) - if isinstance(channel, TextChannel): - overrides.send_messages = None - elif isinstance(channel, VoiceChannel): - overrides.speak = None - await channel.set_permissions(role, overwrite=overrides) - - @cog_ext.cog_slash( - name="lock", - description="Locks a channel", - options=[ - create_option( - name="reason", - description="Lock Reason", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Lock duration in minutes (default 10)", - option_type=4, - required=False, - ), - create_option( - name="channel", - description="Channel to lock", - option_type=7, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _lock( - self, - ctx: SlashContext, - reason: str, - duration: int = 10, - channel: Union[TextChannel, VoiceChannel] = None, - ): - await ctx.defer(hidden=True) - if duration <= 0: - await ctx.send("Duration must be > 0", hidden=True) - return - elif duration >= 300: - await ctx.send("Duration must be < 5 hours", hidden=True) - return - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if not channel: - channel = ctx.channel - for role in ctx.guild.roles: - try: - await self._lock_channel(channel, role, ctx.author, reason) - except Exception: - continue # Just continue on error - _ = Lock( - channel=channel.id, - guild=ctx.guild.id, - admin=ctx.author.id, - reason=reason, - duration=duration, - ).insert() - await ctx.send(f"{channel.mention} locked for {duration} minute(s)") - - @cog_ext.cog_slash( - name="unlock", - description="Unlocks a channel", - options=[ - create_option( - name="channel", - description="Channel to lock", - option_type=7, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _unlock( - self, - ctx: SlashContext, - channel: Union[TextChannel, VoiceChannel] = None, - ): - if not channel: - channel = ctx.channel - lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True) - if not lock: - await ctx.send(f"{channel.mention} not locked.", hidden=True) - return - for role in ctx.guild.roles: - try: - await self._unlock_channel(channel, role, ctx.author) - except Exception: - continue # Just continue on error - lock.active = False - lock.update() - await ctx.send(f"{channel.mention} unlocked") - - @cog_ext.cog_subcommand( - base="lockdown", - name="start", - description="Locks a server", - options=[ - create_option( - name="reason", - description="Lockdown Reason", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Lockdown duration in minutes (default 10)", - option_type=4, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _lockdown_start( - self, - ctx: SlashContext, - reason: str, - duration: int = 10, - ): - await ctx.defer(hidden=True) - if duration <= 0: - await ctx.send("Duration must be > 0", hidden=True) - return - elif duration >= 300: - await ctx.send("Duration must be < 5 hours", hidden=True) - return - channels = ctx.guild.channels - roles = ctx.guild.roles - updates = [] - for channel in channels: - for role in roles: - try: - await self._lock_channel(channel, role, ctx.author, reason) - except Exception: - continue # Just continue on error - updates.append( - pymongo.InsertOne( - { - "channel": channel.id, - "guild": ctx.guild.id, - "admin": ctx.author.id, - "reason": reason, - "duration": duration, - "active": True, - "created_at": datetime.utcnow(), - } - ) - ) - if updates: - self.db.locks.bulk_write(updates) - await ctx.send(f"Server locked for {duration} minute(s)") - - @cog_ext.cog_subcommand( - base="lockdown", - name="end", - description="Unlocks a server", - ) - @commands.has_permissions(administrator=True) - async def _lockdown_end( - self, - ctx: SlashContext, - ): - channels = ctx.guild.channels - roles = ctx.guild.roles - updates = [] - locks = Lock.get_many(guild=ctx.guild.id, active=True) - if not locks: - await ctx.send("No lockdown detected.", hidden=True) - return - await ctx.defer() - for channel in channels: - for role in roles: - try: - await self._unlock_channel(channel, role, ctx.author) - except Exception: - continue # Just continue on error - updates.append( - pymongo.UpdateOne( - { - "channel": channel.id, - "guild": ctx.guild.id, - "admin": ctx.author.id, - }, - {"$set": {"active": False}}, - ) - ) - if updates: - self.db.locks.bulk_write(updates) - await ctx.send("Server unlocked") - - @cog_ext.cog_slash( - name="warn", - description="Warn a user", - options=[ - create_option( - name="user", - description="User to warn", - option_type=6, - required=True, - ), - create_option( - name="reason", - description="Reason for warning", - option_type=3, - required=True, - ), - create_option( - name="duration", - description="Duration of warning in hours, default 24", - option_type=4, - required=False, - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _warn( - self, ctx: SlashContext, user: User, reason: str, duration: int = 24 - ): - if len(reason) > 100: - await ctx.send("Reason must be < 100 characters", hidden=True) - return - if duration <= 0: - await ctx.send("Duration must be > 0", hidden=True) - return - elif duration >= 120: - await ctx.send("Duration must be < 5 days", hidden=True) - return - await ctx.defer() - _ = Warning( - user=user.id, - reason=reason, - admin=ctx.author.id, - guild=ctx.guild.id, - duration=duration, - active=True, - ).insert() - fields = [Field("Reason", reason, False)] - embed = build_embed( - title="Warning", - description=f"{user.mention} has been warned", - fields=fields, - ) - embed.set_author( - name=user.nick if user.nick else user.name, - icon_url=user.avatar_url, - ) - embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") - - await ctx.send(embed=embed) - - @cog_ext.cog_slash( - name="warnings", - description="Get count of user warnings", - options=[ - create_option( - name="user", - description="User to view", - option_type=6, - required=True, - ), - create_option( - name="active", - description="View only active", - option_type=4, - required=False, - choices=[ - create_choice(name="Yes", value=1), - create_choice(name="No", value=0), - ], - ), - ], - ) - @commands.has_permissions(administrator=True) - async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1): - active = bool(active) - exists = self.check_cache(ctx, user_id=user.id, active=active) - if exists: - await ctx.defer(hidden=True) - await ctx.send( - "Please use existing interaction: " - + f"{exists['paginator']._message.jump_url}", - hidden=True, - ) - return - warnings = Warning.get_many( - user=user.id, - guild=ctx.guild.id, - sort=MongoSort(direction="desc", key="created_at"), - ) - active_warns = list(filter(lambda x: x.active, warnings)) - - pages = [] - if active: - if len(active_warns) == 0: - embed = build_embed( - title="Warnings", - description=f"{len(warnings)} total | 0 currently active", - fields=[], - ) - embed.set_author(name=user.name, icon_url=user.avatar_url) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - else: - fields = [] - for warn in active_warns: - admin = ctx.guild.get(warn.admin) - admin_name = "||`[redacted]`||" - if admin: - admin_name = f"{admin.name}#{admin.discriminator}" - fields.append( - Field( - name=warn.created_at.strftime( - "%Y-%m-%d %H:%M:%S UTC" - ), - value=f"{warn.reason}\n" - + f"Admin: {admin_name}\n" - + "\u200b", - inline=False, - ) - ) - for i in range(0, len(fields), 5): - embed = build_embed( - title="Warnings", - description=f"{len(warnings)} total | " - + f"{len(active_warns)} currently active", - fields=fields[i : i + 5], - ) - embed.set_author( - name=user.name + "#" + user.discriminator, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - embed.set_footer( - text=f"{user.name}#{user.discriminator} | {user.id}" - ) - pages.append(embed) - else: - fields = [] - for warn in warnings: - title = "[A] " if warn.active else "[I] " - title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC") - fields.append( - Field( - name=title, - value=warn.reason + "\n\u200b", - inline=False, - ) - ) - for i in range(0, len(fields), 5): - embed = build_embed( - title="Warnings", - description=f"{len(warnings)} total | " - + f"{len(active_warns)} currently active", - fields=fields[i : i + 5], - ) - embed.set_author( - name=user.name + "#" + user.discriminator, - icon_url=user.avatar_url, - ) - embed.set_thumbnail(url=ctx.guild.icon_url) - pages.append(embed) - - paginator = Paginator( - bot=self.bot, - ctx=ctx, - embeds=pages, - only=ctx.author, - timeout=60 * 5, # 5 minute timeout - disable_after_timeout=True, - use_extend=len(pages) > 2, - left_button_style=ButtonStyle.grey, - right_button_style=ButtonStyle.grey, - basic_buttons=["◀", "▶"], - ) - - self.cache[hash(paginator)] = { - "guild": ctx.guild.id, - "user": ctx.author.id, - "timeout": datetime.utcnow() + timedelta(minutes=5), - "command": ctx.subcommand_name, - "user_id": user.id, - "active": active, - "paginator": paginator, - } - - await paginator.start() - - @cog_ext.cog_subcommand( - base="roleping", - name="block", - description="Add a role to the roleping blocklist", - options=[ - create_option( - name="role", - description="Role to add to blocklist", - option_type=8, - required=True, - ) - ], - ) - @commands.has_permissions(administrator=True) - async def _roleping_block(self, ctx: SlashContext, role: Role): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - roles = Setting(guild=ctx.guild.id, setting="roleping", value=[]) - - if role.id in roles.value: - await ctx.send( - f"Role `{role.name}` already in blocklist.", hidden=True - ) - return - roles.value.append(role.id) - roles.update() - await ctx.send(f"Role `{role.name}` added to blocklist.") - - @cog_ext.cog_subcommand( - base="roleping", - name="allow", - description="Remove a role from the roleping blocklist", - options=[ - create_option( - name="role", - description="Role to remove from blocklist", - option_type=8, - required=True, - ) - ], - ) - @commands.has_permissions(administrator=True) - async def _roleping_allow(self, ctx: SlashContext, role: Role): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - await ctx.send("No blocklist configured.", hidden=True) - return - - if role.id not in roles.value: - await ctx.send( - f"Role `{role.name}` not in blocklist.", hidden=True - ) - return - roles.value.remove(role.id) - roles.update() - await ctx.send(f"Role `{role.name}` removed blocklist.") - - @cog_ext.cog_subcommand( - base="roleping", - name="list", - description="List all blocklisted roles", - ) - async def _roleping_list(self, ctx: SlashContext): - roles = Setting.get(guild=ctx.guild.id, setting="roleping") - if not roles: - await ctx.send("No blocklist configured.", hidden=True) - return - - message = "Blocklisted Roles:\n```\n" - if not roles.value: - await ctx.send("No roles blocklisted.", hidden=True) - return - for role in roles.value: - role = ctx.guild.get_role(role) - if not role: - continue - message += role.name + "\n" - message += "```" - await ctx.send(message) - - @cog_ext.cog_subcommand( - base="autopurge", - name="add", - description="Automatically purge messages after x seconds", - options=[ - create_option( - name="channel", - description="Channel to autopurge", - option_type=7, - required=True, - ), - create_option( - name="delay", - description="Seconds to keep message before purge, default 30", - option_type=4, - required=False, - ), - ], - ) - @admin_or_permissions(manage_messages=True) - async def _autopurge_add( - self, ctx: SlashContext, channel: TextChannel, delay: int = 30 - ): - if not isinstance(channel, TextChannel): - await ctx.send("Channel must be a TextChannel", hidden=True) - return - if delay <= 0: - await ctx.send("Delay must be > 0", hidden=True) - return - elif delay > 300: - await ctx.send("Delay must be < 5 minutes", hidden=True) - return - autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) - if autopurge: - await ctx.send("Autopurge already exists.", hidden=True) - return - autopurge = Autopurge( - guild=ctx.guild.id, - channel=channel.id, - admin=ctx.author.id, - delay=delay, - ) - autopurge.insert() - await ctx.send( - f"Autopurge set up on {channel.mention}, " - + f"delay is {delay} seconds" - ) - - @cog_ext.cog_subcommand( - base="autopurge", - name="remove", - description="Remove an autopurge", - options=[ - create_option( - name="channel", - description="Channel to remove from autopurge", - option_type=7, - required=True, - ), - ], - ) - @admin_or_permissions(manage_messages=True) - async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel): - autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) - if not autopurge: - await ctx.send("Autopurge does not exist.", hidden=True) - return - autopurge.delete() - await ctx.send(f"Autopurge removed from {channel.mention}.") - - @cog_ext.cog_subcommand( - base="autopurge", - name="update", - description="Update autopurge on a channel", - options=[ - create_option( - name="channel", - description="Channel to update", - option_type=7, - required=True, - ), - create_option( - name="delay", - description="New time to save", - option_type=4, - required=True, - ), - ], - ) - @admin_or_permissions(manage_messages=True) - async def _autopurge_update( - self, ctx: SlashContext, channel: TextChannel, delay: int - ): - autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) - if not autopurge: - await ctx.send("Autopurge does not exist.", hidden=True) - return - autopurge.delay = delay - autopurge.update() - await ctx.send( - f"Autopurge delay updated to {delay} seconds on {channel.mention}." - ) - - @loop(minutes=1) - async def _expire_interaction(self): - keys = list(self.cache.keys()) - for key in keys: - if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta( - minutes=1 - ): - del self.cache[key] - - -def setup(bot): - bot.add_cog(AdminCog(bot)) diff --git a/jarvis/cogs/admin/__init__.py b/jarvis/cogs/admin/__init__.py new file mode 100644 index 0000000..4ab637e --- /dev/null +++ b/jarvis/cogs/admin/__init__.py @@ -0,0 +1,21 @@ +from jarvis.cogs.admin import ( + ban, + kick, + lock, + lockdown, + mute, + purge, + roleping, + warning, +) + + +def setup(bot): + bot.add_cog(ban.BanCog(bot)) + bot.add_cog(kick.KickCog(bot)) + bot.add_cog(lock.LockCog(bot)) + bot.add_cog(lockdown.LockdownCog(bot)) + bot.add_cog(mute.MuteCog(bot)) + bot.add_cog(purge.PurgeCog(bot)) + bot.add_cog(roleping.RolepingCog(bot)) + bot.add_cog(warning.WarningCog(bot)) diff --git a/jarvis/cogs/admin/ban.py b/jarvis/cogs/admin/ban.py new file mode 100644 index 0000000..bf6b7d6 --- /dev/null +++ b/jarvis/cogs/admin/ban.py @@ -0,0 +1,455 @@ +import re +from datetime import datetime, timedelta + +from ButtonPaginator import Paginator +from discord import User +from discord.ext import commands +from discord.utils import find +from discord_slash import SlashContext, cog_ext +from discord_slash.model import ButtonStyle +from discord_slash.utils.manage_commands import create_choice, create_option + +from jarvis.db.types import Ban, Unban +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions + + +class BanCog(CacheCog): + def __init__(self, bot: commands.Bot): + super().__init__(bot) + + async def discord_apply_ban( + self, + ctx: SlashContext, + reason: str, + user: User, + duration: int, + active: bool, + fields: list, + ): + await ctx.guild.ban(user, reason=reason) + _ = Ban( + user=user.id, + username=user.name, + discrim=user.discriminator, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + type=type, + duration=duration, + active=active, + ).insert() + + embed = build_embed( + title="User Banned", + description=f"Reason: {reason}", + fields=fields, + ) + + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + + await ctx.send(embed=embed) + + async def discord_apply_unban( + self, ctx: SlashContext, user: User, reason: str + ): + await ctx.guild.unban(user, reason=reason) + _ = Unban( + user=user.id, + username=user.name, + discrim=user.discriminator, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + ).insert() + + embed = build_embed( + title="User Unbanned", + description=f"<@{user.id}> was unbanned", + fields=[Field(name="Reason", value=reason)], + ) + embed.set_author( + name=user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + await ctx.send(embed=embed) + + @cog_ext.cog_slash( + name="ban", + description="Ban a user", + options=[ + create_option( + name="user", + description="User to ban", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Ban reason", + required=True, + option_type=3, + ), + create_option( + name="type", + description="Ban type", + option_type=3, + required=False, + choices=[ + create_choice(value="perm", name="Permanent"), + create_choice(value="temp", name="Temporary"), + create_choice(value="soft", name="Soft"), + ], + ), + create_option( + name="duration", + description="Ban duration in hours if temporary", + required=False, + option_type=4, + ), + ], + ) + @admin_or_permissions(ban_members=True) + async def _ban( + self, + ctx: SlashContext, + user: User = None, + reason: str = None, + type: str = "perm", + duration: int = 4, + ): + if not user or user == ctx.author: + await ctx.send("You cannot ban yourself.", hidden=True) + return + if user == self.bot.user: + await ctx.send("I'm afraid I can't let you do that", hidden=True) + return + if type == "temp" and duration < 0: + await ctx.send( + "You cannot set a temp ban to < 0 hours.", hidden=True + ) + return + elif type == "temp" and duration > 744: + await ctx.send( + "You cannot set a temp ban to > 1 month", hidden=True + ) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if not reason: + reason = ( + "Mr. Stark is displeased with your presence. Please leave." + ) + + mtype = type + if mtype == "perm": + mtype = "perma" + + guild_name = ctx.guild.name + user_message = ( + f"You have been {mtype}banned from {guild_name}." + + f" Reason:\n{reason}" + ) + if mtype == "temp": + user_message += f"\nDuration: {duration} hours" + + fields = [Field(name="Type", value=mtype)] + + if mtype == "temp": + fields.append(Field(name="Duration", value=f"{duration} hour(s)")) + + user_embed = build_embed( + title="You have been banned", + description=f"Reason: {reason}", + fields=fields, + ) + + user_embed.set_author( + name=ctx.author.name + "#" + ctx.author.discriminator, + icon_url=ctx.author.avatar_url, + ) + user_embed.set_thumbnail(url=ctx.guild.icon_url) + + try: + await user.send(embed=user_embed) + except Exception: + send_failed = True + try: + await ctx.guild.ban(user, reason=reason) + except Exception as e: + await ctx.send(f"Failed to ban user:\n```\n{e}\n```", hidden=True) + return + send_failed = False + if mtype == "soft": + await ctx.guild.unban(user, reason="Ban was softban") + + fields.append(Field(name="DM Sent?", value=str(not send_failed))) + if type != "temp": + duration = None + active = True + if type == "soft": + active = False + + self.discord_apply_ban(ctx, reason, user, duration, active, fields) + + @cog_ext.cog_slash( + name="unban", + description="Unban a user", + options=[ + create_option( + name="user", + description="User to unban", + option_type=3, + required=True, + ), + create_option( + name="reason", + description="Unban reason", + required=True, + option_type=3, + ), + ], + ) + @admin_or_permissions(ban_members=True) + async def _unban( + self, + ctx: SlashContext, + user: str, + reason: str, + ): + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + + orig_user = user + discrim = None + discord_ban_info = None + database_ban_info = None + + bans = await ctx.guild.bans() + + # Try to get ban information out of Discord + if re.match("^[0-9]{1,}$", user): # User ID + user = int(user) + discord_ban_info = find(lambda x: x.user.id == user, bans) + else: # User name + if re.match("#[0-9]{4}$", user): # User name has discrim + user, discrim = user.split("#") + if discrim: + discord_ban_info = find( + lambda x: x.user.name == user + and x.user.discriminator == discrim, + bans, + ) + else: + results = [ + x for x in filter(lambda x: x.user.name == user, bans) + ] + if results: + if len(results) > 1: + active_bans = [] + for ban in bans: + active_bans.append( + "{0} ({1}): {2}".format( + ban.user.name, ban.user.id, ban.reason + ) + ) + message = ( + "More than one result. " + + "Please use one of the following IDs:\n```" + + "\n".join(active_bans) + + "\n```" + ) + await ctx.send(message) + return + else: + discord_ban_info = results[0] + + # If we don't have the ban information in Discord, + # try to find the relevant information in the database. + # We take advantage of the previous checks to save CPU cycles + if not discord_ban_info: + if isinstance(user, int): + database_ban_info = Ban.get( + guild=ctx.guild.id, user=user, active=True + ) + else: + search = { + "guild": ctx.guild.id, + "username": user, + "active": True, + } + if discrim: + search["discrim"] = discrim + database_ban_info = Ban.get(**search) + + if not discord_ban_info and not database_ban_info: + await ctx.send(f"Unable to find user {orig_user}", hidden=True) + + elif discord_ban_info: + await self.discord_apply_unban(ctx, discord_ban_info.user, reason) + else: + discord_ban_info = find( + lambda x: x.user.id == database_ban_info["id"], bans + ) + if discord_ban_info: + await self.discord_apply_unban( + ctx, discord_ban_info.user, reason + ) + else: + database_ban_info.active = False + database_ban_info.update() + _ = Unban( + user=database_ban_info.user, + username=database_ban_info.username, + discrim=database_ban_info.discrim, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + ).insert() + await ctx.send( + "Unable to find user in Discord, " + + "but removed entry from database." + ) + + @cog_ext.cog_subcommand( + base="bans", + name="list", + description="List bans", + options=[ + create_option( + name="type", + description="Ban type", + option_type=4, + required=False, + choices=[ + create_choice(value=0, name="All"), + create_choice(value=1, name="Permanent"), + create_choice(value=2, name="Temporary"), + create_choice(value=3, name="Soft"), + ], + ), + create_option( + name="active", + description="Active bans", + option_type=4, + required=False, + choices=[ + create_choice(value=1, name="Yes"), + create_choice(value=0, name="No"), + ], + ), + ], + ) + @admin_or_permissions(ban_members=True) + async def _bans_list( + self, ctx: SlashContext, type: int = 0, active: int = 1 + ): + active = bool(active) + exists = self.check_cache(ctx, type=type, active=active) + if exists: + await ctx.defer(hidden=True) + await ctx.send( + "Please use existing interaction: " + + f"{exists['paginator']._message.jump_url}", + hidden=True, + ) + return + types = [0, "perm", "temp", "soft"] + search = {"guild": ctx.guild.id} + if active: + search["active"] = True + if type > 0: + search["type"] = types[type] + bans = Ban.get_many(**search) + bans.sort(key=lambda x: x.created_at, reverse=True) + db_bans = [] + fields = [] + for ban in bans: + if not ban.username: + user = await self.bot.fetch_user(ban.user) + ban.username = user.name if user else "[deleted user]" + fields.append( + Field( + name=f"Username: {ban.username}#{ban.discrim}", + value=f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n" + + f"User ID: {ban.user}\n" + + f"Reason: {ban.reason}\n" + + f"Type: {ban.type}\n\u200b", + inline=False, + ) + ) + db_bans.append(ban.user) + if type == 0 and active: + bans = await ctx.guild.bans() + for ban in bans: + if ban.user.id not in db_bans: + fields.append( + Field( + name=f"Username: {ban.user.name}#" + + f"{ban.user.discriminator}", + value="Date: [unknown]\n" + + f"User ID: {ban.user.id}\n" + + f"Reason: {ban.reason}\n" + + "Type: manual\n\u200b", + inline=False, + ) + ) + + pages = [] + title = "Active " if active else "Inactive " + if type > 0: + title += types[type] + if type == 1: + title += "a" + title += "bans" + if len(fields) == 0: + embed = build_embed( + title=title, + description=f"No {'in' if not active else ''}active bans", + fields=[], + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + else: + for i in range(0, len(bans), 5): + embed = build_embed( + title=title, description="", fields=fields[i : i + 5] + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + + paginator = Paginator( + bot=self.bot, + ctx=ctx, + embeds=pages, + only=ctx.author, + timeout=60 * 5, # 5 minute timeout + disable_after_timeout=True, + use_extend=len(pages) > 2, + left_button_style=ButtonStyle.grey, + right_button_style=ButtonStyle.grey, + basic_buttons=["◀", "▶"], + ) + + self.cache[hash(paginator)] = { + "guild": ctx.guild.id, + "user": ctx.author.id, + "timeout": datetime.utcnow() + timedelta(minutes=5), + "command": ctx.subcommand_name, + "type": type, + "active": active, + "paginator": paginator, + } + + await paginator.start() diff --git a/jarvis/cogs/admin/kick.py b/jarvis/cogs/admin/kick.py new file mode 100644 index 0000000..32184d2 --- /dev/null +++ b/jarvis/cogs/admin/kick.py @@ -0,0 +1,88 @@ +from discord import User +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Kick +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions + + +class KickCog(CacheCog): + def __init__(self, bot): + super().__init__(bot) + + +@cog_ext.cog_slash( + name="kick", + description="Kick a user", + options=[ + create_option( + name="user", + description="User to kick", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Kick reason", + required=False, + option_type=3, + ), + ], +) +@admin_or_permissions(kick_members=True) +async def _kick(self, ctx: SlashContext, user: User, reason=None): + if not user or user == ctx.author: + await ctx.send("You cannot kick yourself.", hidden=True) + return + if user == self.bot.user: + await ctx.send("I'm afraid I can't let you do that", hidden=True) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if not reason: + reason = "Mr. Stark is displeased with your presence. Please leave." + guild_name = ctx.guild.name + embed = build_embed( + title=f"You have been kicked from {guild_name}", + description=f"Reason: {reason}", + fields=[], + ) + + embed.set_author( + name=ctx.author.name + "#" + ctx.author.discriminator, + icon_url=ctx.author.avatar_url, + ) + embed.set_thumbnail(ctx.guild.icon_url) + + send_failed = False + try: + await user.send(embed=embed) + except Exception: + send_failed = True + await ctx.guild.kick(user, reason=reason) + + fields = [Field(name="DM Sent?", value=str(not send_failed))] + embed = build_embed( + title="User Kicked", + description=f"Reason: {reason}", + fields=fields, + ) + + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + + await ctx.send(embed=embed) + _ = Kick( + user=user.id, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + ).insert() diff --git a/jarvis/cogs/admin/lock.py b/jarvis/cogs/admin/lock.py new file mode 100644 index 0000000..1ee896b --- /dev/null +++ b/jarvis/cogs/admin/lock.py @@ -0,0 +1,133 @@ +from typing import Union + +from discord import Role, TextChannel, User, VoiceChannel +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Lock +from jarvis.utils.cachecog import CacheCog + + +class LockCog(CacheCog): + def __init__(self, bot: commands.Bot): + super().__init__(bot) + + async def _lock_channel( + self, + channel: Union[TextChannel, VoiceChannel], + role: Role, + admin: User, + reason: str, + allow_send=False, + ): + overrides = channel.overwrites_for(role) + if isinstance(channel, TextChannel): + overrides.send_messages = allow_send + elif isinstance(channel, VoiceChannel): + overrides.speak = allow_send + await channel.set_permissions(role, overwrite=overrides, reason=reason) + + async def _unlock_channel( + self, + channel: Union[TextChannel, VoiceChannel], + role: Role, + admin: User, + ): + overrides = channel.overwrites_for(role) + if isinstance(channel, TextChannel): + overrides.send_messages = None + elif isinstance(channel, VoiceChannel): + overrides.speak = None + await channel.set_permissions(role, overwrite=overrides) + + @cog_ext.cog_slash( + name="lock", + description="Locks a channel", + options=[ + create_option( + name="reason", + description="Lock Reason", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Lock duration in minutes (default 10)", + option_type=4, + required=False, + ), + create_option( + name="channel", + description="Channel to lock", + option_type=7, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _lock( + self, + ctx: SlashContext, + reason: str, + duration: int = 10, + channel: Union[TextChannel, VoiceChannel] = None, + ): + await ctx.defer(hidden=True) + if duration <= 0: + await ctx.send("Duration must be > 0", hidden=True) + return + elif duration >= 300: + await ctx.send("Duration must be < 5 hours", hidden=True) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if not channel: + channel = ctx.channel + for role in ctx.guild.roles: + try: + await self._lock_channel(channel, role, ctx.author, reason) + except Exception: + continue # Just continue on error + _ = Lock( + channel=channel.id, + guild=ctx.guild.id, + admin=ctx.author.id, + reason=reason, + duration=duration, + ).insert() + await ctx.send(f"{channel.mention} locked for {duration} minute(s)") + + @cog_ext.cog_slash( + name="unlock", + description="Unlocks a channel", + options=[ + create_option( + name="channel", + description="Channel to lock", + option_type=7, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _unlock( + self, + ctx: SlashContext, + channel: Union[TextChannel, VoiceChannel] = None, + ): + if not channel: + channel = ctx.channel + lock = Lock.get(guild=ctx.guild.id, channel=channel.id, active=True) + if not lock: + await ctx.send(f"{channel.mention} not locked.", hidden=True) + return + for role in ctx.guild.roles: + try: + await self._unlock_channel(channel, role, ctx.author) + except Exception: + continue # Just continue on error + lock.active = False + lock.update() + await ctx.send(f"{channel.mention} unlocked") diff --git a/jarvis/cogs/admin/lockdown.py b/jarvis/cogs/admin/lockdown.py new file mode 100644 index 0000000..c023039 --- /dev/null +++ b/jarvis/cogs/admin/lockdown.py @@ -0,0 +1,114 @@ +from datetime import datetime + +import pymongo +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.config import get_config +from jarvis.db import DBManager +from jarvis.db.types import Lock +from jarvis.utils.cachecog import CacheCog + + +class LockdownCog(CacheCog): + def __init__(self, bot: commands.Bot): + super().__init__(bot) + self.db = DBManager(get_config().mongo).mongo.jarvis + + @cog_ext.cog_subcommand( + base="lockdown", + name="start", + description="Locks a server", + options=[ + create_option( + name="reason", + description="Lockdown Reason", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Lockdown duration in minutes (default 10)", + option_type=4, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _lockdown_start( + self, + ctx: SlashContext, + reason: str, + duration: int = 10, + ): + await ctx.defer(hidden=True) + if duration <= 0: + await ctx.send("Duration must be > 0", hidden=True) + return + elif duration >= 300: + await ctx.send("Duration must be < 5 hours", hidden=True) + return + channels = ctx.guild.channels + roles = ctx.guild.roles + updates = [] + for channel in channels: + for role in roles: + try: + await self._lock_channel(channel, role, ctx.author, reason) + except Exception: + continue # Just continue on error + updates.append( + pymongo.InsertOne( + { + "channel": channel.id, + "guild": ctx.guild.id, + "admin": ctx.author.id, + "reason": reason, + "duration": duration, + "active": True, + "created_at": datetime.utcnow(), + } + ) + ) + if updates: + self.db.locks.bulk_write(updates) + await ctx.send(f"Server locked for {duration} minute(s)") + + @cog_ext.cog_subcommand( + base="lockdown", + name="end", + description="Unlocks a server", + ) + @commands.has_permissions(administrator=True) + async def _lockdown_end( + self, + ctx: SlashContext, + ): + channels = ctx.guild.channels + roles = ctx.guild.roles + updates = [] + locks = Lock.get_many(guild=ctx.guild.id, active=True) + if not locks: + await ctx.send("No lockdown detected.", hidden=True) + return + await ctx.defer() + for channel in channels: + for role in roles: + try: + await self._unlock_channel(channel, role, ctx.author) + except Exception: + continue # Just continue on error + updates.append( + pymongo.UpdateOne( + { + "channel": channel.id, + "guild": ctx.guild.id, + "admin": ctx.author.id, + }, + {"$set": {"active": False}}, + ) + ) + if updates: + self.db.locks.bulk_write(updates) + await ctx.send("Server unlocked") diff --git a/jarvis/cogs/admin/mute.py b/jarvis/cogs/admin/mute.py new file mode 100644 index 0000000..9a4361c --- /dev/null +++ b/jarvis/cogs/admin/mute.py @@ -0,0 +1,136 @@ +from discord import Member +from discord.ext import commands +from discord.utils import get +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Mute, Setting +from jarvis.utils import build_embed +from jarvis.utils.field import Field +from jarvis.utils.permissions import admin_or_permissions + + +class MuteCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @cog_ext.cog_slash( + name="mute", + description="Mute a user", + options=[ + create_option( + name="user", + description="User to mute", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Reason for mute", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Mute duration", + option_type=4, + required=False, + ), + ], + ) + @admin_or_permissions(mute_members=True) + async def _mute( + self, ctx: SlashContext, user: Member, reason: str, duration: int = 30 + ): + if user == ctx.author: + await ctx.send("You cannot mute yourself.", hidden=True) + return + if user == self.bot.user: + await ctx.send("I'm afraid I can't let you do that", hidden=True) + return + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") + if not mute_setting: + await ctx.send( + "Please configure a mute role " + + "with /settings mute first", + hidden=True, + ) + return + role = get(ctx.guild.roles, id=mute_setting.value) + if role in user.roles: + await ctx.send("User already muted", hidden=True) + return + await user.add_roles(role, reason=reason) + if duration < 0 or duration > 300: + duration = -1 + _ = Mute( + user=user.id, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + duration=duration, + active=True if duration >= 0 else False, + ).insert() + + embed = build_embed( + title="User Muted", + description=f"{user.mention} has been muted", + fields=[Field(name="Reason", value=reason)], + ) + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + await ctx.send(embed=embed) + + @cog_ext.cog_slash( + name="unmute", + description="Unmute a user", + options=[ + create_option( + name="user", + description="User to unmute", + option_type=6, + required=True, + ) + ], + ) + @admin_or_permissions(mute_members=True) + async def _unmute(self, ctx: SlashContext, user: Member): + mute_setting = Setting.get(guild=ctx.guild.id, setting="mute") + if not mute_setting: + await ctx.send( + "Please configure a mute role with " + + "/settings mute first.", + hidden=True, + ) + return + + role = get(ctx.guild.roles, id=mute_setting.value) + if role in user.roles: + await user.remove_roles(role, reason="Unmute") + else: + await ctx.send("User is not muted.", hidden=True) + return + + mutes = Mute.get_many(guild=ctx.guild.id, user=user.id) + for mute in mutes: + mute.active = False + mute.update() + embed = build_embed( + title="User Unmwaruted", + description=f"{user.mention} has been unmuted", + fields=[], + ) + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=user.avatar_url) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + await ctx.send(embed=embed) diff --git a/jarvis/cogs/admin/purge.py b/jarvis/cogs/admin/purge.py new file mode 100644 index 0000000..3fc6b30 --- /dev/null +++ b/jarvis/cogs/admin/purge.py @@ -0,0 +1,145 @@ +from discord import TextChannel +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Autopurge, Purge +from jarvis.utils.permissions import admin_or_permissions + + +class PurgeCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @cog_ext.cog_slash( + name="purge", + description="Purge messages from channel", + options=[ + create_option( + name="amount", + description="Amount of messages to purge", + required=False, + option_type=4, + ) + ], + ) + @admin_or_permissions(manage_messages=True) + async def _purge(self, ctx: SlashContext, amount: int = 10): + if amount < 1: + await ctx.send("Amount must be >= 1", hidden=True) + return + await ctx.defer() + channel = ctx.channel + messages = [] + async for message in channel.history(limit=amount + 1): + messages.append(message) + await channel.delete_messages(messages) + _ = Purge( + channel=ctx.channel.id, + guild=ctx.guild.id, + admin=ctx.author.id, + count=amount, + ).insert() + + @cog_ext.cog_subcommand( + base="autopurge", + name="add", + description="Automatically purge messages after x seconds", + options=[ + create_option( + name="channel", + description="Channel to autopurge", + option_type=7, + required=True, + ), + create_option( + name="delay", + description="Seconds to keep message before purge, default 30", + option_type=4, + required=False, + ), + ], + ) + @admin_or_permissions(manage_messages=True) + async def _autopurge_add( + self, ctx: SlashContext, channel: TextChannel, delay: int = 30 + ): + if not isinstance(channel, TextChannel): + await ctx.send("Channel must be a TextChannel", hidden=True) + return + if delay <= 0: + await ctx.send("Delay must be > 0", hidden=True) + return + elif delay > 300: + await ctx.send("Delay must be < 5 minutes", hidden=True) + return + autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) + if autopurge: + await ctx.send("Autopurge already exists.", hidden=True) + return + autopurge = Autopurge( + guild=ctx.guild.id, + channel=channel.id, + admin=ctx.author.id, + delay=delay, + ) + autopurge.insert() + await ctx.send( + f"Autopurge set up on {channel.mention}, " + + f"delay is {delay} seconds" + ) + + @cog_ext.cog_subcommand( + base="autopurge", + name="remove", + description="Remove an autopurge", + options=[ + create_option( + name="channel", + description="Channel to remove from autopurge", + option_type=7, + required=True, + ), + ], + ) + @admin_or_permissions(manage_messages=True) + async def _autopurge_remove(self, ctx: SlashContext, channel: TextChannel): + autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) + if not autopurge: + await ctx.send("Autopurge does not exist.", hidden=True) + return + autopurge.delete() + await ctx.send(f"Autopurge removed from {channel.mention}.") + + @cog_ext.cog_subcommand( + base="autopurge", + name="update", + description="Update autopurge on a channel", + options=[ + create_option( + name="channel", + description="Channel to update", + option_type=7, + required=True, + ), + create_option( + name="delay", + description="New time to save", + option_type=4, + required=True, + ), + ], + ) + @admin_or_permissions(manage_messages=True) + async def _autopurge_update( + self, ctx: SlashContext, channel: TextChannel, delay: int + ): + autopurge = Autopurge.get(guild=ctx.guild.id, channel=channel.id) + if not autopurge: + await ctx.send("Autopurge does not exist.", hidden=True) + return + autopurge.delay = delay + autopurge.update() + await ctx.send( + f"Autopurge delay updated to {delay} seconds on {channel.mention}." + ) diff --git a/jarvis/cogs/admin/roleping.py b/jarvis/cogs/admin/roleping.py new file mode 100644 index 0000000..49786a6 --- /dev/null +++ b/jarvis/cogs/admin/roleping.py @@ -0,0 +1,91 @@ +from discord import Role +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.utils.manage_commands import create_option + +from jarvis.db.types import Setting + + +class RolepingCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @cog_ext.cog_subcommand( + base="roleping", + name="block", + description="Add a role to the roleping blocklist", + options=[ + create_option( + name="role", + description="Role to add to blocklist", + option_type=8, + required=True, + ) + ], + ) + @commands.has_permissions(administrator=True) + async def _roleping_block(self, ctx: SlashContext, role: Role): + roles = Setting.get(guild=ctx.guild.id, setting="roleping") + if not roles: + roles = Setting(guild=ctx.guild.id, setting="roleping", value=[]) + + if role.id in roles.value: + await ctx.send( + f"Role `{role.name}` already in blocklist.", hidden=True + ) + return + roles.value.append(role.id) + roles.update() + await ctx.send(f"Role `{role.name}` added to blocklist.") + + @cog_ext.cog_subcommand( + base="roleping", + name="allow", + description="Remove a role from the roleping blocklist", + options=[ + create_option( + name="role", + description="Role to remove from blocklist", + option_type=8, + required=True, + ) + ], + ) + @commands.has_permissions(administrator=True) + async def _roleping_allow(self, ctx: SlashContext, role: Role): + roles = Setting.get(guild=ctx.guild.id, setting="roleping") + if not roles: + await ctx.send("No blocklist configured.", hidden=True) + return + + if role.id not in roles.value: + await ctx.send( + f"Role `{role.name}` not in blocklist.", hidden=True + ) + return + roles.value.remove(role.id) + roles.update() + await ctx.send(f"Role `{role.name}` removed blocklist.") + + @cog_ext.cog_subcommand( + base="roleping", + name="list", + description="List all blocklisted roles", + ) + async def _roleping_list(self, ctx: SlashContext): + roles = Setting.get(guild=ctx.guild.id, setting="roleping") + if not roles: + await ctx.send("No blocklist configured.", hidden=True) + return + + message = "Blocklisted Roles:\n```\n" + if not roles.value: + await ctx.send("No roles blocklisted.", hidden=True) + return + for role in roles.value: + role = ctx.guild.get_role(role) + if not role: + continue + message += role.name + "\n" + message += "```" + await ctx.send(message) diff --git a/jarvis/cogs/admin/warning.py b/jarvis/cogs/admin/warning.py new file mode 100644 index 0000000..bd7bd12 --- /dev/null +++ b/jarvis/cogs/admin/warning.py @@ -0,0 +1,215 @@ +from datetime import datetime, timedelta + +from ButtonPaginator import Paginator +from discord import User +from discord.ext import commands +from discord_slash import SlashContext, cog_ext +from discord_slash.model import ButtonStyle +from discord_slash.utils.manage_commands import create_choice, create_option + +from jarvis.db.types import MongoSort, Warning +from jarvis.utils import build_embed +from jarvis.utils.cachecog import CacheCog +from jarvis.utils.field import Field + + +class WarningCog(CacheCog): + def __init__(self, bot): + super().__init__(bot) + + @cog_ext.cog_slash( + name="warn", + description="Warn a user", + options=[ + create_option( + name="user", + description="User to warn", + option_type=6, + required=True, + ), + create_option( + name="reason", + description="Reason for warning", + option_type=3, + required=True, + ), + create_option( + name="duration", + description="Duration of warning in hours, default 24", + option_type=4, + required=False, + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _warn( + self, ctx: SlashContext, user: User, reason: str, duration: int = 24 + ): + if len(reason) > 100: + await ctx.send("Reason must be < 100 characters", hidden=True) + return + if duration <= 0: + await ctx.send("Duration must be > 0", hidden=True) + return + elif duration >= 120: + await ctx.send("Duration must be < 5 days", hidden=True) + return + await ctx.defer() + _ = Warning( + user=user.id, + reason=reason, + admin=ctx.author.id, + guild=ctx.guild.id, + duration=duration, + active=True, + ).insert() + fields = [Field("Reason", reason, False)] + embed = build_embed( + title="Warning", + description=f"{user.mention} has been warned", + fields=fields, + ) + embed.set_author( + name=user.nick if user.nick else user.name, + icon_url=user.avatar_url, + ) + embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}") + + await ctx.send(embed=embed) + + @cog_ext.cog_slash( + name="warnings", + description="Get count of user warnings", + options=[ + create_option( + name="user", + description="User to view", + option_type=6, + required=True, + ), + create_option( + name="active", + description="View only active", + option_type=4, + required=False, + choices=[ + create_choice(name="Yes", value=1), + create_choice(name="No", value=0), + ], + ), + ], + ) + @commands.has_permissions(administrator=True) + async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1): + active = bool(active) + exists = self.check_cache(ctx, user_id=user.id, active=active) + if exists: + await ctx.defer(hidden=True) + await ctx.send( + "Please use existing interaction: " + + f"{exists['paginator']._message.jump_url}", + hidden=True, + ) + return + warnings = Warning.get_many( + user=user.id, + guild=ctx.guild.id, + sort=MongoSort(direction="desc", key="created_at"), + ) + active_warns = list(filter(lambda x: x.active, warnings)) + + pages = [] + if active: + if len(active_warns) == 0: + embed = build_embed( + title="Warnings", + description=f"{len(warnings)} total | 0 currently active", + fields=[], + ) + embed.set_author(name=user.name, icon_url=user.avatar_url) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + else: + fields = [] + for warn in active_warns: + admin = ctx.guild.get(warn.admin) + admin_name = "||`[redacted]`||" + if admin: + admin_name = f"{admin.name}#{admin.discriminator}" + fields.append( + Field( + name=warn.created_at.strftime( + "%Y-%m-%d %H:%M:%S UTC" + ), + value=f"{warn.reason}\n" + + f"Admin: {admin_name}\n" + + "\u200b", + inline=False, + ) + ) + for i in range(0, len(fields), 5): + embed = build_embed( + title="Warnings", + description=f"{len(warnings)} total | " + + f"{len(active_warns)} currently active", + fields=fields[i : i + 5], + ) + embed.set_author( + name=user.name + "#" + user.discriminator, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + embed.set_footer( + text=f"{user.name}#{user.discriminator} | {user.id}" + ) + pages.append(embed) + else: + fields = [] + for warn in warnings: + title = "[A] " if warn.active else "[I] " + title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC") + fields.append( + Field( + name=title, + value=warn.reason + "\n\u200b", + inline=False, + ) + ) + for i in range(0, len(fields), 5): + embed = build_embed( + title="Warnings", + description=f"{len(warnings)} total | " + + f"{len(active_warns)} currently active", + fields=fields[i : i + 5], + ) + embed.set_author( + name=user.name + "#" + user.discriminator, + icon_url=user.avatar_url, + ) + embed.set_thumbnail(url=ctx.guild.icon_url) + pages.append(embed) + + paginator = Paginator( + bot=self.bot, + ctx=ctx, + embeds=pages, + only=ctx.author, + timeout=60 * 5, # 5 minute timeout + disable_after_timeout=True, + use_extend=len(pages) > 2, + left_button_style=ButtonStyle.grey, + right_button_style=ButtonStyle.grey, + basic_buttons=["◀", "▶"], + ) + + self.cache[hash(paginator)] = { + "guild": ctx.guild.id, + "user": ctx.author.id, + "timeout": datetime.utcnow() + timedelta(minutes=5), + "command": ctx.subcommand_name, + "user_id": user.id, + "active": active, + "paginator": paginator, + } + + await paginator.start() diff --git a/jarvis/cogs/modlog/__init__.py b/jarvis/cogs/modlog/__init__.py new file mode 100644 index 0000000..5a9b6de --- /dev/null +++ b/jarvis/cogs/modlog/__init__.py @@ -0,0 +1,7 @@ +from jarvis.cogs.modlog import command, member, message + + +def setup(bot): + bot.add_cog(command.ModlogCommandCog(bot)) + bot.add_cog(member.ModlogMemberCog(bot)) + bot.add_cog(message.ModlogMessageCog(bot)) diff --git a/jarvis/cogs/modlog/command.py b/jarvis/cogs/modlog/command.py new file mode 100644 index 0000000..a1d0f61 --- /dev/null +++ b/jarvis/cogs/modlog/command.py @@ -0,0 +1,58 @@ +from discord import DMChannel +from discord.ext import commands +from discord_slash import SlashContext + +from jarvis.db.types import Setting +from jarvis.utils import build_embed +from jarvis.utils.field import Field + + +class ModlogCommandCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @commands.Cog.listener() + async def on_slash_command(self, ctx: SlashContext): + if not isinstance(ctx.channel, DMChannel): + modlog = Setting.get(guild=ctx.guild.id, setting="modlog") + if modlog: + channel = ctx.guild.get_channel(modlog.value) + fields = [ + Field("Command", ctx.name), + ] + if ctx.args: + fields.append( + Field( + "Args", + " ".join(ctx.args), + False, + ) + ) + if ctx.kwargs: + kwargs_string = " ".join( + f"{k}: {ctx.kwargs[k]}" for k in ctx.kwargs + ) + fields.append( + Field( + "Keyword Args", + kwargs_string, + False, + ) + ) + if ctx.subcommand_name: + fields.insert(1, Field("Subcommand", ctx.subcommand_name)) + embed = build_embed( + title="Command Invoked", + description=f"{ctx.author.mention} invoked a command", + fields=fields, + color="#fc9e3f", + ) + embed.set_author( + name=ctx.author.name, + icon_url=ctx.author.avatar_url, + ) + embed.set_footer( + text=f"{ctx.author.name}#{ctx.author.discriminator}" + + f" | {ctx.author.id}" + ) + await channel.send(embed=embed) diff --git a/jarvis/cogs/modlog.py b/jarvis/cogs/modlog/member.py similarity index 60% rename from jarvis/cogs/modlog.py rename to jarvis/cogs/modlog/member.py index c91a2b8..859f282 100644 --- a/jarvis/cogs/modlog.py +++ b/jarvis/cogs/modlog/member.py @@ -2,65 +2,20 @@ import asyncio from datetime import datetime, timedelta import discord -from discord import DMChannel from discord.ext import commands from discord.utils import find -from discord_slash import SlashContext +from jarvis.cogs.modlog.utils import get_latest_log, modlog_embed from jarvis.config import get_config from jarvis.db.types import Ban, Kick, MongoSort, Mute, Setting from jarvis.utils import build_embed from jarvis.utils.field import Field -class ModlogCog(commands.Cog): - """ - A hybrid user/modlog functionality for J.A.R.V.I.S. - """ - - def __init__(self, bot: discord.ext.commands.Bot): +class ModlogMemberCog(commands.Cog): + def __init__(self, bot): self.bot = bot - def get_latest_log(self, auditlog, target): - before = datetime.utcnow() - timedelta(seconds=10) - return find( - lambda x: x.target.id == target.id and x.created_at > before, - auditlog, - ) - - async def modlog_embed( - self, - member: discord.Member, - admin: discord.Member, - log: discord.AuditLogEntry, - title: str, - desc: str, - ) -> discord.Embed: - fields = [ - Field( - name="Moderator", - value=f"{admin.mention} ({admin.name}" - + f"#{admin.discriminator})", - ), - ] - if log.reason: - fields.append(Field(name="Reason", value=log.reason, inline=False)) - embed = build_embed( - title=title, - description=desc, - color="#fc9e3f", - fields=fields, - timestamp=log.created_at, - ) - embed.set_author( - name=f"{member.name}", - icon_url=member.avatar_url, - ) - embed.set_footer( - text=f"{member.name}#{member.discriminator} | {member.id}" - ) - return embed - @commands.Cog.listener() async def on_member_ban(self, guild: discord.Guild, user: discord.User): modlog = Setting.get(guild=guild.id, setting="modlog") @@ -73,7 +28,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, user) + log: discord.AuditLogEntry = get_latest_log(auditlog, user) admin: discord.User = log.user if admin.id == get_config().client_id: ban = Ban.get( @@ -83,7 +38,7 @@ class ModlogCog(commands.Cog): sort=MongoSort(key="created_at", type="desc"), ) admin = guild.get_member(ban.admin) - embed = await self.modlog_embed( + embed = await modlog_embed( user, admin, log, @@ -105,7 +60,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, user) + log: discord.AuditLogEntry = get_latest_log(auditlog, user) admin: discord.User = log.user if admin.id == get_config().client_id: ban = Ban.get( @@ -115,7 +70,7 @@ class ModlogCog(commands.Cog): sort=MongoSort(key="created_at", type="desc"), ) admin = guild.get_member(ban.admin) - embed = await self.modlog_embed( + embed = await modlog_embed( user, admin, log, @@ -137,7 +92,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, user) + log: discord.AuditLogEntry = get_latest_log(auditlog, user) admin: discord.User = log.user if admin.id == get_config().client_id: kick = Kick.get( @@ -146,7 +101,7 @@ class ModlogCog(commands.Cog): sort=MongoSort(key="created_at", type="desc"), ) admin = user.guild.get_member(kick.admin) - embed = await self.modlog_embed( + embed = await modlog_embed( user, admin, log, @@ -163,7 +118,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, before) + log: discord.AuditLogEntry = get_latest_log(auditlog, before) admin: discord.User = log.user if admin.id == get_config().client_id: mute = Mute.get( @@ -173,7 +128,7 @@ class ModlogCog(commands.Cog): sort=MongoSort(key="created_at", type="desc"), ) admin = before.guild.get_member(mute.admin) - return await self.modlog_embed( + return await modlog_embed( member=before, admin=admin, log=log, @@ -188,7 +143,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, before) + log: discord.AuditLogEntry = get_latest_log(auditlog, before) admin: discord.User = log.user if admin.id == get_config().client_id: mute = Mute.get( @@ -199,7 +154,7 @@ class ModlogCog(commands.Cog): ) mute = Mute(**mute) admin = before.guild.get_member(mute.admin) - return await self.modlog_embed( + return await modlog_embed( member=before, admin=admin, log=log, @@ -214,9 +169,9 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, before) + log: discord.AuditLogEntry = get_latest_log(auditlog, before) admin: discord.User = log.user - return await self.modlog_embed( + return await modlog_embed( member=before, admin=admin, log=log, @@ -231,7 +186,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log(auditlog, before) + log: discord.AuditLogEntry = get_latest_log(auditlog, before) admin: discord.User = log.user role = None title = "User Given Role" @@ -243,7 +198,7 @@ class ModlogCog(commands.Cog): elif len(before.roles) < len(after.roles): role = find(lambda x: x not in before.roles, after.roles) role_text = role.mention if role else "||`[redacted]`||" - return await self.modlog_embed( + return await modlog_embed( member=before, admin=admin, log=log, @@ -279,9 +234,7 @@ class ModlogCog(commands.Cog): after=datetime.utcnow() - timedelta(seconds=15), oldest_first=False, ).flatten() - log: discord.AuditLogEntry = self.get_latest_log( - auditlog, before - ) + log: discord.AuditLogEntry = get_latest_log(auditlog, before) bname = before.nick if before.nick else before.name aname = after.nick if after.nick else after.name fields = [ @@ -327,117 +280,3 @@ class ModlogCog(commands.Cog): embed = await self.process_rolechange(before, after) if embed: await channel.send(embed=embed) - - @commands.Cog.listener() - async def on_message_edit( - self, before: discord.Message, after: discord.Message - ): - if before.author != get_config().client_id: - modlog = Setting.get(guild=after.guild.id, setting="modlog") - if modlog: - if before.content == after.content or before.content is None: - return - channel = before.guild.get_channel(modlog.value) - fields = [ - Field( - "Original Message", - before.content if before.content else "N/A", - False, - ), - Field( - "New Message", - after.content if after.content else "N/A", - False, - ), - ] - embed = build_embed( - title="Message Edited", - description=f"{before.author.mention} edited a message", - fields=fields, - color="#fc9e3f", - timestamp=after.edited_at, - url=after.jump_url, - ) - embed.set_author( - name=before.author.name, - icon_url=before.author.avatar_url, - url=after.jump_url, - ) - embed.set_footer( - text=f"{before.author.name}#{before.author.discriminator}" - + f" | {before.author.id}" - ) - await channel.send(embed=embed) - - @commands.Cog.listener() - async def on_message_delete(self, message: discord.Message): - modlog = Setting.get(guild=message.guild.id, setting="modlog") - if modlog: - fields = [Field("Original Message", message.content, False)] - channel = message.guild.get_channel(modlog.value) - embed = build_embed( - title="Message Deleted", - description=f"{message.author.mention}'s message was deleted", - fields=fields, - color="#fc9e3f", - ) - embed.set_author( - name=message.author.name, - icon_url=message.author.avatar_url, - url=message.jump_url, - ) - embed.set_footer( - text=f"{message.author.name}#{message.author.discriminator}" - + f" | {message.author.id}" - ) - await channel.send(embed=embed) - - @commands.Cog.listener() - async def on_slash_command(self, ctx: SlashContext): - if not isinstance(ctx.channel, DMChannel): - modlog = Setting.get(guild=ctx.guild.id, setting="modlog") - if modlog: - channel = ctx.guild.get_channel(modlog.value) - fields = [ - Field("Command", ctx.name), - ] - if ctx.args: - fields.append( - Field( - "Args", - " ".join(ctx.args), - False, - ) - ) - if ctx.kwargs: - kwargs_string = " ".join( - f"{k}: {ctx.kwargs[k]}" for k in ctx.kwargs - ) - fields.append( - Field( - "Keyword Args", - kwargs_string, - False, - ) - ) - if ctx.subcommand_name: - fields.insert(1, Field("Subcommand", ctx.subcommand_name)) - embed = build_embed( - title="Command Invoked", - description=f"{ctx.author.mention} invoked a command", - fields=fields, - color="#fc9e3f", - ) - embed.set_author( - name=ctx.author.name, - icon_url=ctx.author.avatar_url, - ) - embed.set_footer( - text=f"{ctx.author.name}#{ctx.author.discriminator}" - + f" | {ctx.author.id}" - ) - await channel.send(embed=embed) - - -def setup(bot): - bot.add_cog(ModlogCog(bot)) diff --git a/jarvis/cogs/modlog/message.py b/jarvis/cogs/modlog/message.py new file mode 100644 index 0000000..0cc130f --- /dev/null +++ b/jarvis/cogs/modlog/message.py @@ -0,0 +1,75 @@ +import discord +from discord.ext import commands + +from jarvis.db.types import Setting +from jarvis.utils import build_embed +from jarvis.utils.field import Field + + +class ModlogMessageCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + + @commands.Cog.listener() + async def on_message_edit( + self, before: discord.Message, after: discord.Message + ): + if not before.author.bot: + modlog = Setting.get(guild=after.guild.id, setting="modlog") + if modlog: + if before.content == after.content or before.content is None: + return + channel = before.guild.get_channel(modlog.value) + fields = [ + Field( + "Original Message", + before.content if before.content else "N/A", + False, + ), + Field( + "New Message", + after.content if after.content else "N/A", + False, + ), + ] + embed = build_embed( + title="Message Edited", + description=f"{before.author.mention} edited a message", + fields=fields, + color="#fc9e3f", + timestamp=after.edited_at, + url=after.jump_url, + ) + embed.set_author( + name=before.author.name, + icon_url=before.author.avatar_url, + url=after.jump_url, + ) + embed.set_footer( + text=f"{before.author.name}#{before.author.discriminator}" + + f" | {before.author.id}" + ) + await channel.send(embed=embed) + + @commands.Cog.listener() + async def on_message_delete(self, message: discord.Message): + modlog = Setting.get(guild=message.guild.id, setting="modlog") + if modlog: + fields = [Field("Original Message", message.content, False)] + channel = message.guild.get_channel(modlog.value) + embed = build_embed( + title="Message Deleted", + description=f"{message.author.mention}'s message was deleted", + fields=fields, + color="#fc9e3f", + ) + embed.set_author( + name=message.author.name, + icon_url=message.author.avatar_url, + url=message.jump_url, + ) + embed.set_footer( + text=f"{message.author.name}#{message.author.discriminator}" + + f" | {message.author.id}" + ) + await channel.send(embed=embed) diff --git a/jarvis/cogs/modlog/utils.py b/jarvis/cogs/modlog/utils.py new file mode 100644 index 0000000..64e3771 --- /dev/null +++ b/jarvis/cogs/modlog/utils.py @@ -0,0 +1,49 @@ +from datetime import datetime, timedelta + +import discord +from discord.utils import find + +from jarvis.utils import build_embed +from jarvis.utils.field import Field + + +def modlog_embed( + self, + member: discord.Member, + admin: discord.Member, + log: discord.AuditLogEntry, + title: str, + desc: str, +) -> discord.Embed: + fields = [ + Field( + name="Moderator", + value=f"{admin.mention} ({admin.name}" + + f"#{admin.discriminator})", + ), + ] + if log.reason: + fields.append(Field(name="Reason", value=log.reason, inline=False)) + embed = build_embed( + title=title, + description=desc, + color="#fc9e3f", + fields=fields, + timestamp=log.created_at, + ) + embed.set_author( + name=f"{member.name}", + icon_url=member.avatar_url, + ) + embed.set_footer( + text=f"{member.name}#{member.discriminator} | {member.id}" + ) + return embed + + +def get_latest_log(self, auditlog, target): + before = datetime.utcnow() - timedelta(seconds=10) + return find( + lambda x: x.target.id == target.id and x.created_at > before, + auditlog, + ) diff --git a/jarvis/events/guild.py b/jarvis/events/guild.py new file mode 100644 index 0000000..15528ce --- /dev/null +++ b/jarvis/events/guild.py @@ -0,0 +1,26 @@ +import asyncio + +from discord.utils import find + +from jarvis import jarvis +from jarvis.db.types import Setting + + +@jarvis.event +async def on_guild_join(guild): + general = find(lambda x: x.name == "general", guild.channels) + if general and general.permissions_for(guild.me).send_messages: + await general.send( + "Allow me to introduce myself. I am J.A.R.V.I.S., a virtual " + + "artificial intelligence, and I'm here to assist you with a " + + "variety of tasks as best I can, " + + "24 hours a day, seven days a week." + ) + await asyncio.sleep(1) + await general.send("Importing all preferences from home interface...") + + # Set some default settings + setting = Setting(guild=guild.id, setting="massmention", value=5) + setting.insert() + + await general.send("Systems are now fully operational") diff --git a/jarvis/events/member.py b/jarvis/events/member.py new file mode 100644 index 0000000..d706f62 --- /dev/null +++ b/jarvis/events/member.py @@ -0,0 +1,20 @@ +from discord import Member + +from jarvis import jarvis +from jarvis.db.types import Mute, Setting + + +@jarvis.event +async def on_member_join(user: Member): + guild = user.guild + mutes = Mute.get_active(guild=guild.id) + if mutes and len(mutes) >= 1: + mute_role = Setting.get(guild=guild.id, setting="mute") + role = guild.get_role(mute_role.value) + await user.add_roles( + role, reason="User is muted still muted from prior mute" + ) + unverified = Setting.get(guild=guild.id, setting="unverified") + if unverified: + role = guild.get_role(unverified.value) + await user.add_roles(role, reason="User just joined and is unverified") diff --git a/jarvis/events/message.py b/jarvis/events/message.py new file mode 100644 index 0000000..d148b57 --- /dev/null +++ b/jarvis/events/message.py @@ -0,0 +1,188 @@ +import re + +from discord import DMChannel, Message +from discord.utils import find + +from jarvis import jarvis +from jarvis.config import get_config +from jarvis.db.types import Autopurge, Autoreact, Setting +from jarvis.utils import build_embed +from jarvis.utils.field import Field + +invites = re.compile( + r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)", + flags=re.IGNORECASE, +) + + +async def autopurge(message): + autopurge = Autopurge.get( + guild=message.guild.id, channel=message.channel.id + ) + if autopurge: + await message.delete(delay=autopurge.delay) + + +async def autoreact(message): + autoreact = Autoreact.get( + guild=message.guild.id, + channel=message.channel.id, + ) + if autoreact: + for reaction in autoreact.reactions: + await message.add_reaction(reaction) + + +async def checks(message): + # #tech + channel = find( + lambda x: x.id == 599068193339736096, message.channel_mentions + ) + if channel and message.author.id == 293795462752894976: + await channel.send( + content="https://cdn.discordapp.com/attachments/" + + "664621130044407838/805218508866453554/tech.gif" + ) + content = re.sub(r"\s+", "", message.content) + match = invites.search(content) + if match: + guild_invites = await message.guild.invites() + allowed = [x.code for x in guild_invites] + [ + "dbrand", + "VtgZntXcnZ", + ] + if match.group(1) not in allowed: + await message.delete() + warning = Warning( + active=True, + admin=get_config().client_id, + duration=24, + guild=message.guild.id, + reason="Sent an invite link", + user=message.author.id, + ) + warning.insert() + fields = [ + Field( + "Reason", + "Sent an invite link", + False, + ) + ] + embed = build_embed( + title="Warning", + description=f"{message.author.mention} has been warned", + fields=fields, + ) + embed.set_author( + name=message.author.nick + if message.author.nick + else message.author.name, + icon_url=message.author.avatar_url, + ) + embed.set_footer( + text=f"{message.author.name}#" + + f"{message.author.discriminator} " + + f"| {message.author.id}" + ) + await message.channel.send(embed=embed) + + +async def massmention(message): + massmention = Setting.get( + guild=message.guild.id, + setting="massmention", + ) + if ( + massmention.value > 0 + and len(message.mentions) + - (1 if message.author in message.mentions else 0) + > massmention.value + ): + warning = Warning( + active=True, + admin=get_config().client_id, + duration=24, + guild=message.guild.id, + reason="Mass Mention", + user=message.author.id, + ) + warning.insert() + fields = [Field("Reason", "Mass Mention", False)] + embed = build_embed( + title="Warning", + description=f"{message.author.mention} has been warned", + fields=fields, + ) + embed.set_author( + name=message.author.nick + if message.author.nick + else message.author.name, + icon_url=message.author.avatar_url, + ) + embed.set_footer( + text=f"{message.author.name}#{message.author.discriminator} " + + f"| {message.author.id}" + ) + await message.channel.send(embed=embed) + + +async def roleping(message): + roleping = Setting.get(guild=message.guild.id, setting="roleping") + roles = [] + for mention in message.role_mentions: + roles.append(mention.id) + for mention in message.mentions: + for role in mention.roles: + roles.append(role.id) + if ( + roleping + and any(x in roleping.value for x in roles) + and not any(x.id in roleping.value for x in message.author.roles) + ): + warning = Warning( + active=True, + admin=get_config().client_id, + duration=24, + guild=message.guild.id, + reason="Pinged a blocked role/user with a blocked role", + user=message.author.id, + ) + warning.insert() + fields = [ + Field( + "Reason", + "Pinged a blocked role/user with a blocked role", + False, + ) + ] + embed = build_embed( + title="Warning", + description=f"{message.author.mention} has been warned", + fields=fields, + ) + embed.set_author( + name=message.author.nick + if message.author.nick + else message.author.name, + icon_url=message.author.avatar_url, + ) + embed.set_footer( + text=f"{message.author.name}#{message.author.discriminator} " + + f"| {message.author.id}" + ) + await message.channel.send(embed=embed) + + +@jarvis.event +async def on_message(message: Message): + if ( + not isinstance(message.channel, DMChannel) + and message.author.id != jarvis.user.id + ): + await autoreact(message) + await massmention(message) + await roleping(message) + await autopurge(message) + await checks(message) + await jarvis.process_commands(message) diff --git a/jarvis/tasks/__init__.py b/jarvis/tasks/__init__.py new file mode 100644 index 0000000..bbde0a8 --- /dev/null +++ b/jarvis/tasks/__init__.py @@ -0,0 +1,8 @@ +from jarvis.tasks import unban, unlock, unmute, unwarn + + +def init(): + unban.unban.start() + unlock.unlock.start() + unmute.unmute.start() + unwarn.unwarn.start() diff --git a/jarvis/tasks/unban.py b/jarvis/tasks/unban.py new file mode 100644 index 0000000..597edca --- /dev/null +++ b/jarvis/tasks/unban.py @@ -0,0 +1,34 @@ +from datetime import datetime, timedelta + +import pymongo +from discord.ext.tasks import loop + +import jarvis +from jarvis.db.types import Ban + + +@loop(minutes=10) +async def unban(): + bans = Ban.get_active(type="temp") + updates = [] + for ban in bans: + if ban.created_at + timedelta( + hours=ban.duration + ) < datetime.utcnow() + timedelta(minutes=10): + guild = await jarvis.jarvis.fetch_guild(ban.guild) + user = await jarvis.jarvis.fetch_user(ban.user) + if user: + guild.unban(user) + updates.append( + pymongo.UpdateOne( + { + "user": user.id, + "guild": guild.id, + "created_at": ban.created_at, + "type": "temp", + }, + {"$set": {"active": False}}, + ) + ) + if updates: + jarvis.jarvis_db.bans.bulk_write(updates) diff --git a/jarvis/tasks/unlock.py b/jarvis/tasks/unlock.py new file mode 100644 index 0000000..26078a9 --- /dev/null +++ b/jarvis/tasks/unlock.py @@ -0,0 +1,40 @@ +from datetime import datetime, timedelta + +import pymongo +from discord.ext.tasks import loop + +import jarvis +from jarvis.db.types import Lock + + +@loop(minutes=1) +async def unlock(): + locks = Lock.get_active() + updates = [] + for lock in locks: + if ( + lock.created_at + timedelta(minutes=lock.duration) + < datetime.utcnow() + ): + guild = await jarvis.jarvis.fetch_guild(lock.guild) + channel = await jarvis.jarvis.fetch_channel(lock.channel) + if channel: + roles = await guild.fetch_roles() + for role in roles: + overrides = channel.overwrites_for(role) + overrides.send_messages = None + await channel.set_permissions( + role, overwrite=overrides, reason="Lock expired" + ) + updates.append( + pymongo.UpdateOne( + { + "channel": channel.id, + "guild": guild.id, + "created_at": lock.created_at, + }, + {"$set": {"active": False}}, + ) + ) + if updates: + jarvis.jarvis_db.locks.bulk_write(updates) diff --git a/jarvis/tasks/unmute.py b/jarvis/tasks/unmute.py new file mode 100644 index 0000000..064112d --- /dev/null +++ b/jarvis/tasks/unmute.py @@ -0,0 +1,42 @@ +from datetime import datetime, timedelta + +import pymongo +from discord.ext.tasks import loop + +import jarvis +from jarvis.db.types import Mute, Setting + + +@loop(minutes=1) +async def unmute(): + mutes = Mute.get_active(duration={"$gt": 0}) + mute_roles = Setting.get_many(setting="mute") + updates = [] + for mute in mutes: + if ( + mute.created_at + timedelta(minutes=mute.duration) + < datetime.utcnow() + ): + mute_role = [x.value for x in mute_roles if x.guild == mute.guild][ + 0 + ] + guild = await jarvis.jarvis.fetch_guild(mute.guild) + role = guild.get_role(mute_role) + user = await guild.fetch_member(mute.user) + if user: + if role in user.roles: + await user.remove_roles(role, reason="Mute expired") + + # Objects can't handle bulk_write, so handle it via raw methods + updates.append( + pymongo.UpdateOne( + { + "user": user.id, + "guild": guild.id, + "created_at": mute.created_at, + }, + {"$set": {"active": False}}, + ) + ) + if updates: + jarvis.jarvis_db.mutes.bulk_write(updates) diff --git a/jarvis/tasks/unwarn.py b/jarvis/tasks/unwarn.py new file mode 100644 index 0000000..423e43f --- /dev/null +++ b/jarvis/tasks/unwarn.py @@ -0,0 +1,25 @@ +from datetime import datetime, timedelta + +import pymongo +from discord.ext.tasks import loop + +import jarvis +from jarvis.db.types import Warning + + +@loop(hours=1) +async def unwarn(): + warns = Warning.get_active() + updates = [] + for warn in warns: + if ( + warn.created_at + timedelta(hours=warn.duration) + < datetime.utcnow() + ): + updates.append( + pymongo.UpdateOne( + {"_id": warn._id}, {"$set": {"active": False}} + ) + ) + if updates: + jarvis.jarvis_db.warns.bulk_write(updates) diff --git a/jarvis/utils/__init__.py b/jarvis/utils/__init__.py index a0b1b68..5461c17 100644 --- a/jarvis/utils/__init__.py +++ b/jarvis/utils/__init__.py @@ -9,7 +9,7 @@ import jarvis.cogs import jarvis.config import jarvis.db -__all__ = ["field", "db"] +__all__ = ["field", "db", "cachecog", "permissions"] def convert_bytesize(bytes: int) -> str: diff --git a/jarvis/utils/cachecog.py b/jarvis/utils/cachecog.py new file mode 100644 index 0000000..d5355f8 --- /dev/null +++ b/jarvis/utils/cachecog.py @@ -0,0 +1,33 @@ +from datetime import datetime, timedelta + +from discord.ext import commands +from discord.ext.tasks import loop +from discord.utils import find +from discord_slash import SlashContext + + +class CacheCog(commands.Cog): + def __init__(self, bot: commands.Bot): + self.bot = bot + self.cache = {} + self._expire_interaction.start() + + def check_cache(self, ctx: SlashContext, **kwargs): + if not kwargs: + kwargs = {} + return find( + lambda x: x["command"] == ctx.subcommand_name + and x["user"] == ctx.author.id + and x["guild"] == ctx.guild.id + and all(x[k] == v for k, v in kwargs.items()), + self.cache.values(), + ) + + @loop(minutes=1) + async def _expire_interaction(self): + keys = list(self.cache.keys()) + for key in keys: + if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta( + minutes=1 + ): + del self.cache[key] diff --git a/requirements.txt b/requirements.txt index 88ec417..1916f9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,10 @@ discord-py>=1.7, <2 psutil>=5.8, <6 GitPython>=3.1, <4 PyYaml>=5.4, <6 -discord-py-slash-command>=2.3, <3 +discord-py-slash-command>=2.3.2, <3 pymongo>=3.12.0, <4 opencv-python>=4.5, <5 +ButtonPaginator>=0.0.3 +Pillow>=8.2.0, <9 +python-gitlab>=2.9.0, <3 +ulid-py>=1.1.0, <2