132 lines
4.6 KiB
Python
132 lines
4.6 KiB
Python
"""JARVIS PurgeCog."""
|
|
import logging
|
|
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Autopurge, Purge
|
|
from naff import Client, Extension, InteractionContext, Permissions
|
|
from naff.models.discord.channel import GuildText
|
|
from naff.models.naff.application_commands import (
|
|
OptionTypes,
|
|
slash_command,
|
|
slash_option,
|
|
)
|
|
from naff.models.naff.command import check
|
|
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
class PurgeCog(Extension):
|
|
"""JARVIS PurgeCog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
@slash_command(name="purge", description="Purge messages from channel")
|
|
@slash_option(
|
|
name="amount",
|
|
description="Amount of messages to purge, default 10",
|
|
opt_type=OptionTypes.INTEGER,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
|
|
async def _purge(self, ctx: InteractionContext, amount: int = 10) -> None:
|
|
if amount < 1:
|
|
await ctx.send("Amount must be >= 1", ephemeral=True)
|
|
return
|
|
await ctx.defer()
|
|
|
|
messages = []
|
|
async for message in ctx.channel.history(limit=amount + 1):
|
|
messages.append(message)
|
|
await ctx.channel.delete_messages(messages, reason=f"Purge by {ctx.author.username}")
|
|
await Purge(
|
|
channel=ctx.channel.id,
|
|
guild=ctx.guild.id,
|
|
admin=ctx.author.id,
|
|
count=amount,
|
|
).commit()
|
|
|
|
@slash_command(name="autopurge", sub_cmd_name="add", sub_cmd_description="Automatically purge messages")
|
|
@slash_option(
|
|
name="channel",
|
|
description="Channel to autopurge",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@slash_option(
|
|
name="delay",
|
|
description="Seconds to keep message before purge, default 30",
|
|
opt_type=OptionTypes.INTEGER,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
|
|
async def _autopurge_add(self, ctx: InteractionContext, channel: GuildText, delay: int = 30) -> None:
|
|
if not isinstance(channel, GuildText):
|
|
await ctx.send("Channel must be a GuildText channel", ephemeral=True)
|
|
return
|
|
if delay <= 0:
|
|
await ctx.send("Delay must be > 0", ephemeral=True)
|
|
return
|
|
elif delay > 300:
|
|
await ctx.send("Delay must be < 5 minutes", ephemeral=True)
|
|
return
|
|
|
|
autopurge = await Autopurge.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if autopurge:
|
|
await ctx.send("Autopurge already exists.", ephemeral=True)
|
|
return
|
|
|
|
await Autopurge(
|
|
guild=ctx.guild.id,
|
|
channel=channel.id,
|
|
admin=ctx.author.id,
|
|
delay=delay,
|
|
).commit()
|
|
|
|
await ctx.send(f"Autopurge set up on {channel.mention}, delay is {delay} seconds")
|
|
|
|
@slash_command(name="autopurge", sub_cmd_name="remove", sub_cmd_description="Remove an autopurge")
|
|
@slash_option(
|
|
name="channel",
|
|
description="Channel to remove from autopurge",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
|
|
async def _autopurge_remove(self, ctx: InteractionContext, channel: GuildText) -> None:
|
|
autopurge = await Autopurge.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if not autopurge:
|
|
await ctx.send("Autopurge does not exist.", ephemeral=True)
|
|
return
|
|
await autopurge.delete()
|
|
await ctx.send(f"Autopurge removed from {channel.mention}.")
|
|
|
|
@slash_command(
|
|
name="autopurge",
|
|
sub_cmd_name="update",
|
|
sub_cmd_description="Update autopurge on a channel",
|
|
)
|
|
@slash_option(
|
|
name="channel",
|
|
description="Channel to update",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@slash_option(
|
|
name="delay",
|
|
description="New time to save",
|
|
opt_type=OptionTypes.INTEGER,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
|
|
async def _autopurge_update(self, ctx: InteractionContext, channel: GuildText, delay: int) -> None:
|
|
autopurge = await Autopurge.find_one(q(guild=ctx.guild.id, channel=channel.id))
|
|
if not autopurge:
|
|
await ctx.send("Autopurge does not exist.", ephemeral=True)
|
|
return
|
|
|
|
autopurge.delay = delay
|
|
await autopurge.commit()
|
|
|
|
await ctx.send(f"Autopurge delay updated to {delay} seconds on {channel.mention}.")
|