284 lines
9.4 KiB
Python
284 lines
9.4 KiB
Python
"""JARVIS Starboard Cog."""
|
|
import asyncio
|
|
import logging
|
|
|
|
from interactions import Client, Extension, InteractionContext, Permissions
|
|
from interactions.client import errors
|
|
from interactions.models.discord.channel import GuildText
|
|
from interactions.models.discord.components import (
|
|
ActionRow,
|
|
Button,
|
|
StringSelectMenu,
|
|
StringSelectOption,
|
|
)
|
|
from interactions.models.discord.enums import ButtonStyle
|
|
from interactions.models.discord.message import Message
|
|
from interactions.models.internal.application_commands import (
|
|
CommandType,
|
|
OptionType,
|
|
SlashCommand,
|
|
context_menu,
|
|
slash_option,
|
|
)
|
|
from interactions.models.internal.command import check
|
|
from jarvis_core.db.models import Pin, Pinboard
|
|
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
supported_images = [
|
|
"image/png",
|
|
"image/gif",
|
|
"image/jpeg",
|
|
"image/webp",
|
|
"image/svg",
|
|
]
|
|
|
|
|
|
class PinboardCog(Extension):
|
|
"""JARVIS Pinboard Cog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def _purge_starboard(self, ctx: InteractionContext, board: Pinboard) -> None:
|
|
channel = await ctx.guild.fetch_channel(board.channel)
|
|
async for pin in Pin.find(
|
|
Pin.pinboard == channel.id, Pin.guild == ctx.guild.id
|
|
):
|
|
if message := await channel.fetch_message(pin.message):
|
|
try:
|
|
await message.delete()
|
|
await asyncio.sleep(1) # Avoid rate limits
|
|
except (errors.Forbidden, errors.NotFound):
|
|
self.logger.debug(f"Failed to delete star {pin.id}'s message.")
|
|
await pin.delete()
|
|
|
|
pinboard = SlashCommand(name="pinboard", description="Extra pins! Manage pinboards")
|
|
|
|
@pinboard.subcommand(
|
|
sub_cmd_name="list",
|
|
sub_cmd_description="List all pinboards",
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD, Permissions.MANAGE_MESSAGES))
|
|
async def _list(self, ctx: InteractionContext) -> None:
|
|
pinboards = await Pinboard.find(Pinboard.guild == ctx.guild.id).to_list()
|
|
if pinboards != []:
|
|
message = "Available Pinboards:\n"
|
|
for s in pinboards:
|
|
message += f"<#{s.channel}>\n"
|
|
await ctx.send(message)
|
|
else:
|
|
await ctx.send("No Pinboards available.")
|
|
|
|
@pinboard.subcommand(sub_cmd_name="create", sub_cmd_description="Create a Pinboard")
|
|
@slash_option(
|
|
name="channel",
|
|
description="Pinboard channel",
|
|
opt_type=OptionType.CHANNEL,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD, Permissions.MANAGE_MESSAGES))
|
|
async def _create(self, ctx: InteractionContext, channel: GuildText) -> None:
|
|
if channel not in ctx.guild.channels:
|
|
await ctx.send(
|
|
"Channel not in guild. Choose an existing channel.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if not isinstance(channel, GuildText):
|
|
await ctx.send("Channel must be a GuildText", ephemeral=True)
|
|
return
|
|
|
|
exists = await Pinboard.find_one(
|
|
Pinboard.channel == channel.id, Pinboard.guild == ctx.guild.id
|
|
)
|
|
if exists:
|
|
await ctx.send(
|
|
f"Pinboard already exists at {channel.mention}.", ephemeral=True
|
|
)
|
|
return
|
|
|
|
count = await Pinboard.find(Pinboard.guild == ctx.guild.id).count()
|
|
if count >= 25:
|
|
await ctx.send("25 pinboard limit reached", ephemeral=True)
|
|
return
|
|
|
|
await Pinboard(
|
|
guild=ctx.guild.id,
|
|
channel=channel.id,
|
|
admin=ctx.author.id,
|
|
).save()
|
|
await ctx.send(f"Pinboard created. Check it out at {channel.mention}.")
|
|
|
|
@pinboard.subcommand(sub_cmd_name="delete", sub_cmd_description="Delete a pinboard")
|
|
@slash_option(
|
|
name="channel",
|
|
description="Pinboard channel",
|
|
opt_type=OptionType.CHANNEL,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD, Permissions.MANAGE_MESSAGES))
|
|
async def _delete(self, ctx: InteractionContext, channel: GuildText) -> None:
|
|
found = await Pinboard.find_one(
|
|
Pinboard.channel == channel.id, Pinboard.guild == ctx.guild.id
|
|
)
|
|
if found:
|
|
await found.delete()
|
|
asyncio.create_task(self._purge_starboard(ctx, found))
|
|
await ctx.send(f"Pinboard deleted from {channel.mention}.")
|
|
else:
|
|
await ctx.send(f"Pinboard not found in {channel.mention}.", ephemeral=True)
|
|
|
|
async def _star_add(
|
|
self,
|
|
ctx: InteractionContext,
|
|
message: str,
|
|
channel: GuildText = None,
|
|
) -> None:
|
|
if not channel:
|
|
channel = ctx.channel
|
|
pinboards = await Pinboard.find(Pinboard.guild == ctx.guild.id).to_list()
|
|
if not pinboards:
|
|
await ctx.send("No pinboards exist.", ephemeral=True)
|
|
return
|
|
|
|
await ctx.defer()
|
|
|
|
if not isinstance(message, Message):
|
|
if message.startswith("https://"):
|
|
message = message.split("/")[-1]
|
|
message = await channel.fetch_message(int(message))
|
|
|
|
if not message:
|
|
await ctx.send("Message not found", ephemeral=True)
|
|
return
|
|
|
|
channel_list = []
|
|
to_delete: list[Pinboard] = []
|
|
|
|
channel_to_pinboard = {}
|
|
|
|
for pinboard in pinboards:
|
|
c = await ctx.guild.fetch_channel(pinboard.channel)
|
|
if c and isinstance(c, GuildText):
|
|
channel_list.append(c)
|
|
channel_to_pinboard[c.id] = pinboard
|
|
else:
|
|
self.logger.warning(
|
|
f"Pinboard {pinboard.channel} no longer valid in {ctx.guild.name}"
|
|
)
|
|
to_delete.append(pinboard)
|
|
|
|
for pinboard in to_delete:
|
|
try:
|
|
await pinboard.delete()
|
|
except Exception:
|
|
self.logger.debug("Ignoring deletion error")
|
|
|
|
select_channels = []
|
|
for idx, x in enumerate(channel_list):
|
|
if x:
|
|
select_channels.append(StringSelectOption(label=x.name, value=str(idx)))
|
|
|
|
select_channels = [
|
|
StringSelectOption(label=x.name, value=str(idx))
|
|
for idx, x in enumerate(channel_list)
|
|
]
|
|
|
|
select = StringSelectMenu(
|
|
*select_channels,
|
|
min_values=1,
|
|
max_values=1,
|
|
)
|
|
|
|
components = [ActionRow(select)]
|
|
|
|
msg = await ctx.send(content="Choose a pinboard", components=components)
|
|
|
|
com_ctx = await self.bot.wait_for_component(
|
|
messages=msg,
|
|
components=components,
|
|
check=lambda x: ctx.author.id == x.ctx.author.id,
|
|
)
|
|
|
|
starboard = channel_list[int(com_ctx.ctx.values[0])]
|
|
|
|
exists = await Pin.find_one(
|
|
Pin.message == int(message.id),
|
|
Pin.channel == int(channel.id),
|
|
Pin.guild == int(ctx.guild.id),
|
|
Pin.pinboard == int(pinboard.id),
|
|
)
|
|
|
|
if exists:
|
|
await ctx.send(
|
|
f"Message already sent to Pinboard {pinboard.mention}",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
count = await Pin.find(
|
|
Pin.guild == ctx.guild.id, Pin.pinboard == pinboard.id
|
|
).count()
|
|
content = message.content
|
|
|
|
attachments = message.attachments
|
|
image_urls = []
|
|
if attachments:
|
|
for attachment in attachments:
|
|
if attachment.content_type in supported_images:
|
|
image_urls.append(attachment.url)
|
|
if not content and len(image_urls) > 0:
|
|
content = "\u200b"
|
|
|
|
embed = build_embed(
|
|
title=f"[#{count}] Click Here to view context",
|
|
description=content,
|
|
fields=[],
|
|
url=message.jump_url,
|
|
timestamp=message.created_at,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.display_name,
|
|
url=message.jump_url,
|
|
icon_url=message.author.avatar.url,
|
|
)
|
|
embed.set_footer(text=ctx.guild.name + " | " + channel.name)
|
|
if len(image_urls) > 0:
|
|
embed.set_images(*image_urls)
|
|
star_components = Button(
|
|
style=ButtonStyle.DANGER,
|
|
emoji="🗑️",
|
|
custom_id=f"delete|{ctx.author.id}",
|
|
)
|
|
pin = await pinboard.send(embeds=embed, components=star_components)
|
|
|
|
await Pin(
|
|
index=count,
|
|
message=int(message.id),
|
|
channel=int(channel.id),
|
|
guild=int(ctx.guild.id),
|
|
pinboard=channel_to_pinboard[pinboard.id],
|
|
admin=int(ctx.author.id),
|
|
pin=int(pin.id),
|
|
active=True,
|
|
).save()
|
|
|
|
components[0].components[0].disabled = True
|
|
|
|
await com_ctx.ctx.edit_origin(
|
|
content=f"Message saved to Pinboard.\nSee it in {starboard.mention}",
|
|
components=components,
|
|
)
|
|
|
|
@context_menu(name="Pin Message", context_type=CommandType.MESSAGE)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD, Permissions.MANAGE_MESSAGES))
|
|
async def _star_message(self, ctx: InteractionContext) -> None:
|
|
await self._star_add(ctx, message=str(ctx.target_id))
|
|
|
|
|
|
def setup(bot: Client) -> None:
|
|
"""Add PinboardCog to JARVIS"""
|
|
PinboardCog(bot)
|