372 lines
13 KiB
Python
372 lines
13 KiB
Python
"""JARVIS BanCog."""
|
|
import re
|
|
from datetime import timedelta
|
|
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Ban, Unban
|
|
from naff import InteractionContext, Permissions
|
|
from naff.client.utils.misc_utils import find, find_all
|
|
from naff.ext.paginators import Paginator
|
|
from naff.models.discord.embed import EmbedField
|
|
from naff.models.discord.user import User
|
|
from naff.models.naff.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
SlashCommandChoice,
|
|
slash_command,
|
|
slash_option,
|
|
)
|
|
from naff.models.naff.command import check
|
|
|
|
from jarvis.branding import get_command_color
|
|
from jarvis.embeds.admin import ban_embed, unban_embed
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.cogs import ModcaseCog
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
time_pattern = re.compile(r"(\d+\.?\d+?[s|m|h|d|w]{1})\s?", re.I)
|
|
|
|
|
|
class BanCog(ModcaseCog):
|
|
"""JARVIS BanCog."""
|
|
|
|
async def discord_apply_ban(
|
|
self,
|
|
ctx: InteractionContext,
|
|
reason: str,
|
|
user: User,
|
|
duration: int,
|
|
active: bool,
|
|
mtype: str,
|
|
delete_history: int = 0,
|
|
) -> None:
|
|
"""Apply a Discord ban."""
|
|
await ctx.guild.ban(user, reason=reason, delete_message_seconds=delete_history)
|
|
b = Ban(
|
|
user=user.id,
|
|
username=user.username,
|
|
discrim=user.discriminator,
|
|
reason=reason,
|
|
admin=ctx.author.id,
|
|
guild=ctx.guild.id,
|
|
type=mtype,
|
|
duration=duration,
|
|
active=active,
|
|
)
|
|
await b.commit()
|
|
|
|
embed = ban_embed(
|
|
user=user,
|
|
admin=ctx.author,
|
|
reason=reason,
|
|
guild=ctx.guild,
|
|
duration=duration,
|
|
type=mtype,
|
|
)
|
|
|
|
await ctx.send(embeds=embed)
|
|
|
|
async def discord_apply_unban(self, ctx: InteractionContext, user: User, reason: str) -> None:
|
|
"""Apply a Discord unban."""
|
|
await ctx.guild.unban(user, reason=reason)
|
|
u = Unban(
|
|
user=user.id,
|
|
username=user.username,
|
|
discrim=user.discriminator,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
reason=reason,
|
|
)
|
|
await u.commit()
|
|
|
|
embed = unban_embed(user=user, admin=ctx.author, reason=reason)
|
|
|
|
await ctx.send(embeds=embed)
|
|
|
|
@slash_command(name="ban", description="Ban a user")
|
|
@slash_option(name="user", description="User to ban", opt_type=OptionTypes.USER, required=True)
|
|
@slash_option(name="reason", description="Ban reason", opt_type=OptionTypes.STRING, required=True)
|
|
@slash_option(
|
|
name="btype",
|
|
description="Ban type",
|
|
opt_type=OptionTypes.STRING,
|
|
required=True,
|
|
choices=[
|
|
SlashCommandChoice(name="Permanent", value="perm"),
|
|
SlashCommandChoice(name="Temporary", value="temp"),
|
|
SlashCommandChoice(name="Soft", value="soft"),
|
|
],
|
|
)
|
|
@slash_option(
|
|
name="duration",
|
|
description="Temp ban duration in hours",
|
|
opt_type=OptionTypes.INTEGER,
|
|
required=False,
|
|
)
|
|
@slash_option(
|
|
name="delete_history",
|
|
description="Delete message history, format: 1w 3d 7h 5m 20s",
|
|
opt_type=OptionTypes.STRING,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _ban(
|
|
self,
|
|
ctx: InteractionContext,
|
|
user: User,
|
|
reason: str,
|
|
btype: str = "perm",
|
|
duration: int = 4,
|
|
delete_history: str = None,
|
|
) -> None:
|
|
if user.id == ctx.author.id:
|
|
await ctx.send("You cannot ban yourself.", ephemeral=True)
|
|
return
|
|
if user.id == self.bot.user.id:
|
|
await ctx.send("I'm afraid I can't let you do that", ephemeral=True)
|
|
return
|
|
if btype == "temp" and duration < 0:
|
|
await ctx.send("You cannot set a temp ban to < 0 hours.", ephemeral=True)
|
|
return
|
|
elif btype == "temp" and duration > 744:
|
|
await ctx.send("You cannot set a temp ban to > 1 month", ephemeral=True)
|
|
return
|
|
if not time_pattern.match(delete_history):
|
|
await ctx.send("Invalid time string, please follow example: 1w 3d 7h 5m 20s", ephemeral=True)
|
|
return
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", ephemeral=True)
|
|
return
|
|
|
|
if delete_history:
|
|
units = {"w": "weeks", "d": "days", "h": "hours", "m": "minutes", "s": "seconds"}
|
|
delta = {"weeks": 0, "days": 0, "hours": 0, "minutes": 0, "seconds": 0}
|
|
delete_history = delete_history.strip().lower()
|
|
if delete_history:
|
|
if times := time_pattern.findall(delete_history):
|
|
for t in times:
|
|
delta[units[t[-1]]] += float(t[:-1])
|
|
delete_history = int(timedelta(**delta).total_seconds)
|
|
|
|
if delete_history > 604800:
|
|
await ctx.send("Delete history cannot be greater than 7 days (604800 seconds)", ephemeral=True)
|
|
return
|
|
|
|
await ctx.defer()
|
|
|
|
mtype = btype
|
|
if mtype == "perm":
|
|
mtype = "perma"
|
|
|
|
guild_name = ctx.guild.name
|
|
user_message = f"You have been {mtype}banned from {guild_name}." + f" Reason:\n{reason}"
|
|
if mtype == "temp":
|
|
user_message += f"\nDuration: {duration} hours"
|
|
|
|
user_embed = ban_embed(
|
|
user=user,
|
|
admin=ctx.author,
|
|
reason=reason,
|
|
type=mtype,
|
|
guild=ctx.guild,
|
|
duration=duration,
|
|
dm=True,
|
|
)
|
|
|
|
try:
|
|
await user.send(embed=user_embed)
|
|
except Exception:
|
|
self.logger.warn(f"Failed to send ban embed to {user.id}")
|
|
try:
|
|
await self.discord_apply_ban(ctx, reason, user, duration, active, mtype, delete_history or 0)
|
|
except Exception as e:
|
|
await ctx.send(f"Failed to ban user:\n```\n{e}\n```", ephemeral=True)
|
|
return
|
|
|
|
if mtype == "soft":
|
|
await ctx.guild.unban(user, reason="Ban was softban")
|
|
|
|
if btype != "temp":
|
|
duration = None
|
|
active = True
|
|
if btype == "soft":
|
|
active = False
|
|
|
|
@slash_command(name="unban", description="Unban a user")
|
|
@slash_option(name="user", description="User to unban", opt_type=OptionTypes.STRING, required=True)
|
|
@slash_option(name="reason", description="Unban reason", opt_type=OptionTypes.STRING, required=True)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _unban(
|
|
self,
|
|
ctx: InteractionContext,
|
|
user: str,
|
|
reason: str,
|
|
) -> None:
|
|
if len(reason) > 100:
|
|
await ctx.send("Reason must be < 100 characters", ephemeral=True)
|
|
return
|
|
|
|
orig_user = user
|
|
discrim = None
|
|
discord_ban_info = None
|
|
database_ban_info = None
|
|
|
|
bans = await ctx.guild.fetch_bans()
|
|
|
|
# Try to get ban information out of Discord
|
|
self.logger.debug(f"{user}")
|
|
if re.match(r"^[0-9]{1,}$", user): # User ID
|
|
user = int(user)
|
|
discord_ban_info = find(lambda x: x.user.id == user, bans)
|
|
else: # User name
|
|
if re.match(r"#[0-9]{4}$", user): # User name has discrim
|
|
user, discrim = user.split("#")
|
|
if discrim:
|
|
discord_ban_info = find(
|
|
lambda x: x.user.username == user and x.user.discriminator == discrim,
|
|
bans,
|
|
)
|
|
else:
|
|
results = find_all(lambda x: x.user.username == user, bans)
|
|
if results:
|
|
if len(results) > 1:
|
|
active_bans = []
|
|
for ban in bans:
|
|
active_bans.append("{0} ({1}): {2}".format(ban.user.username, ban.user.id, ban.reason))
|
|
ab_message = "\n".join(active_bans)
|
|
message = "More than one result. " f"Please use one of the following IDs:\n```{ab_message}\n```"
|
|
await ctx.send(message)
|
|
return
|
|
else:
|
|
discord_ban_info = results[0]
|
|
|
|
# If we don't have the ban information in Discord,
|
|
# try to find the relevant information in the database.
|
|
# We take advantage of the previous checks to save CPU cycles
|
|
if not discord_ban_info:
|
|
if isinstance(user, User):
|
|
database_ban_info = await Ban.find_one(q(guild=ctx.guild.id, user=user.id, active=True))
|
|
else:
|
|
search = {
|
|
"guild": ctx.guild.id,
|
|
"username": user,
|
|
"active": True,
|
|
}
|
|
if discrim:
|
|
search["discrim"] = discrim
|
|
database_ban_info = await Ban.find_one(q(**search))
|
|
|
|
if not discord_ban_info and not database_ban_info:
|
|
await ctx.send(f"Unable to find user {orig_user}", ephemeral=True)
|
|
|
|
elif discord_ban_info and not database_ban_info:
|
|
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
|
|
|
|
else:
|
|
discord_ban_info = find(lambda x: x.user.id == database_ban_info.id, bans)
|
|
if discord_ban_info:
|
|
await self.discord_apply_unban(ctx, discord_ban_info.user, reason)
|
|
else:
|
|
database_ban_info.active = False
|
|
database_ban_info.save()
|
|
_ = Unban(
|
|
user=database_ban_info.user,
|
|
username=database_ban_info.username,
|
|
discrim=database_ban_info.discrim,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
reason=reason,
|
|
).save()
|
|
await ctx.send("Unable to find user in Discord, but removed entry from database.")
|
|
|
|
bans = SlashCommand(name="bans", description="User bans")
|
|
|
|
@bans.subcommand(sub_cmd_name="list", sub_cmd_description="List bans")
|
|
@slash_option(
|
|
name="btype",
|
|
description="Ban type",
|
|
opt_type=OptionTypes.INTEGER,
|
|
required=False,
|
|
choices=[
|
|
SlashCommandChoice(name="All", value=0),
|
|
SlashCommandChoice(name="Permanent", value=1),
|
|
SlashCommandChoice(name="Temporary", value=2),
|
|
SlashCommandChoice(name="Soft", value=3),
|
|
],
|
|
)
|
|
@slash_option(
|
|
name="active",
|
|
description="Active bans",
|
|
opt_type=OptionTypes.BOOLEAN,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.BAN_MEMBERS))
|
|
async def _bans_list(self, ctx: InteractionContext, btype: int = 0, active: bool = True) -> None:
|
|
types = [0, "perm", "temp", "soft"]
|
|
search = {"guild": ctx.guild.id}
|
|
if active:
|
|
search["active"] = True
|
|
if btype > 0:
|
|
search["type"] = types[btype]
|
|
bans = await Ban.find(search).sort([("created_at", -1)]).to_list(None)
|
|
db_bans = []
|
|
fields = []
|
|
for ban in bans:
|
|
if not ban.username:
|
|
user = await self.bot.fetch_user(ban.user)
|
|
ban.username = user.username if user else "[deleted user]"
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"Username: {ban.username}#{ban.discrim}",
|
|
value=(
|
|
f"Date: {ban.created_at.strftime('%d-%m-%Y')}\n"
|
|
f"User ID: {ban.user}\n"
|
|
f"Reason: {ban.reason}\n"
|
|
f"Type: {ban.type}\n\u200b"
|
|
),
|
|
inline=False,
|
|
)
|
|
)
|
|
db_bans.append(ban.user)
|
|
if type == 0 and active:
|
|
bans = await ctx.guild.bans()
|
|
for ban in bans:
|
|
if ban.user.id not in db_bans:
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"Username: {ban.user.username}#" + f"{ban.user.discriminator}",
|
|
value=(
|
|
f"Date: [unknown]\n"
|
|
f"User ID: {ban.user.id}\n"
|
|
f"Reason: {ban.reason}\n"
|
|
"Type: manual\n\u200b"
|
|
),
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
pages = []
|
|
title = "Active " if active else "Inactive "
|
|
if btype > 0:
|
|
title += types[btype]
|
|
if btype == 1:
|
|
title += "a"
|
|
title += "bans"
|
|
if len(fields) == 0:
|
|
embed = build_embed(
|
|
title=title,
|
|
description=f"No {'in' if not active else ''}active bans",
|
|
fields=[],
|
|
color=get_command_color("bans_list"),
|
|
)
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
pages.append(embed)
|
|
else:
|
|
for i in range(0, len(bans), 5):
|
|
embed = build_embed(title=title, description="", fields=fields[i : i + 5])
|
|
embed.set_thumbnail(url=ctx.guild.icon.url)
|
|
pages.append(embed)
|
|
|
|
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
|
|
|
|
await paginator.send(ctx)
|