323 lines
10 KiB
Python
323 lines
10 KiB
Python
"""JARVIS Starboard Cog."""
|
|
import logging
|
|
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import Star, Starboard
|
|
from naff import Client, Extension, InteractionContext, Permissions
|
|
from naff.models.discord.channel import GuildText
|
|
from naff.models.discord.components import ActionRow, Select, SelectOption
|
|
from naff.models.discord.message import Message
|
|
from naff.models.naff.application_commands import (
|
|
CommandTypes,
|
|
OptionTypes,
|
|
SlashCommand,
|
|
context_menu,
|
|
slash_option,
|
|
)
|
|
from naff.models.naff.command import check
|
|
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
supported_images = [
|
|
"image/png",
|
|
"image/gif",
|
|
"image/jpeg",
|
|
"image/webp",
|
|
"image/svg",
|
|
]
|
|
|
|
|
|
class StarboardCog(Extension):
|
|
"""JARVIS Starboard Cog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
starboard = SlashCommand(name="starboard", description="Extra pins! Manage starboards")
|
|
|
|
@starboard.subcommand(
|
|
sub_cmd_name="list",
|
|
sub_cmd_description="List all starboards",
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _list(self, ctx: InteractionContext) -> None:
|
|
starboards = await Starboard.find(q(guild=ctx.guild.id)).to_list(None)
|
|
if starboards != []:
|
|
message = "Available Starboards:\n"
|
|
for s in starboards:
|
|
message += f"<#{s.channel}>\n"
|
|
await ctx.send(message)
|
|
else:
|
|
await ctx.send("No Starboards available.")
|
|
|
|
@starboard.subcommand(sub_cmd_name="create", sub_cmd_description="Create a starboard")
|
|
@slash_option(
|
|
name="channel",
|
|
description="Starboard channel",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _create(self, ctx: InteractionContext, channel: GuildText) -> None:
|
|
if channel not in ctx.guild.channels:
|
|
await ctx.send(
|
|
"Channel not in guild. Choose an existing channel.",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
if not isinstance(channel, GuildText):
|
|
await ctx.send("Channel must be a GuildText", ephemeral=True)
|
|
return
|
|
|
|
exists = await Starboard.find_one(q(channel=channel.id, guild=ctx.guild.id))
|
|
if exists:
|
|
await ctx.send(f"Starboard already exists at {channel.mention}.", ephemeral=True)
|
|
return
|
|
|
|
count = await Starboard.count_documents(q(guild=ctx.guild.id))
|
|
if count >= 25:
|
|
await ctx.send("25 starboard limit reached", ephemeral=True)
|
|
return
|
|
|
|
await Starboard(
|
|
guild=ctx.guild.id,
|
|
channel=channel.id,
|
|
admin=ctx.author.id,
|
|
).commit()
|
|
await ctx.send(f"Starboard created. Check it out at {channel.mention}.")
|
|
|
|
@starboard.subcommand(sub_cmd_name="delete", sub_cmd_description="Delete a starboard")
|
|
@slash_option(
|
|
name="channel",
|
|
description="Starboard channel",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _delete(self, ctx: InteractionContext, channel: GuildText) -> None:
|
|
found = await Starboard.find_one(q(channel=channel.id, guild=ctx.guild.id))
|
|
if found:
|
|
await found.delete()
|
|
await ctx.send(f"Starboard deleted from {channel.mention}.")
|
|
else:
|
|
await ctx.send(f"Starboard not found in {channel.mention}.", ephemeral=True)
|
|
|
|
async def _star_add(
|
|
self,
|
|
ctx: InteractionContext,
|
|
message: str,
|
|
channel: GuildText = None,
|
|
) -> None:
|
|
if not channel:
|
|
channel = ctx.channel
|
|
starboards = await Starboard.find(q(guild=ctx.guild.id)).to_list(None)
|
|
if not starboards:
|
|
await ctx.send("No starboards exist.", ephemeral=True)
|
|
return
|
|
|
|
await ctx.defer()
|
|
|
|
if not isinstance(message, Message):
|
|
if message.startswith("https://"):
|
|
message = message.split("/")[-1]
|
|
message = await channel.fetch_message(int(message))
|
|
|
|
if not message:
|
|
await ctx.send("Message not found", ephemeral=True)
|
|
return
|
|
|
|
channel_list = []
|
|
to_delete = []
|
|
for starboard in starboards:
|
|
c = await ctx.guild.fetch_channel(starboard.channel)
|
|
if c and isinstance(c, GuildText):
|
|
channel_list.append(c)
|
|
else:
|
|
self.logger.warning(
|
|
f"Starboard {starboard.channel} no longer valid in {ctx.guild.name}"
|
|
)
|
|
to_delete.append(starboard)
|
|
|
|
for starboard in to_delete:
|
|
try:
|
|
await starboard.delete()
|
|
except Exception:
|
|
self.logger.debug("Ignoring deletion error")
|
|
|
|
select_channels = []
|
|
for idx, x in enumerate(channel_list):
|
|
if x:
|
|
select_channels.append(SelectOption(label=x.name, value=str(idx)))
|
|
|
|
select_channels = [
|
|
SelectOption(label=x.name, value=str(idx)) for idx, x in enumerate(channel_list)
|
|
]
|
|
|
|
select = Select(
|
|
options=select_channels,
|
|
min_values=1,
|
|
max_values=1,
|
|
)
|
|
|
|
components = [ActionRow(select)]
|
|
|
|
msg = await ctx.send(content="Choose a starboard", components=components)
|
|
|
|
com_ctx = await self.bot.wait_for_component(
|
|
messages=msg,
|
|
components=components,
|
|
check=lambda x: ctx.author.id == x.context.author.id,
|
|
)
|
|
|
|
starboard = channel_list[int(com_ctx.context.values[0])]
|
|
|
|
exists = await Star.find_one(
|
|
q(
|
|
message=message.id,
|
|
channel=channel.id,
|
|
guild=ctx.guild.id,
|
|
starboard=starboard.id,
|
|
)
|
|
)
|
|
|
|
if exists:
|
|
await ctx.send(
|
|
f"Message already sent to Starboard {starboard.mention}",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
count = await Star.count_documents(q(guild=ctx.guild.id, starboard=starboard.id))
|
|
content = message.content
|
|
|
|
attachments = message.attachments
|
|
image_url = None
|
|
if attachments:
|
|
for attachment in attachments:
|
|
if attachment.content_type in supported_images:
|
|
image_url = attachment.url
|
|
break
|
|
if not content and image_url:
|
|
content = "\u200b"
|
|
|
|
embed = build_embed(
|
|
title=f"[#{count}] Click Here to view context",
|
|
description=content,
|
|
fields=[],
|
|
url=message.jump_url,
|
|
timestamp=message.created_at,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.display_name,
|
|
url=message.jump_url,
|
|
icon_url=message.author.avatar.url,
|
|
)
|
|
embed.set_footer(text=ctx.guild.name + " | " + channel.name)
|
|
if image_url:
|
|
embed.set_image(url=image_url)
|
|
|
|
star = await starboard.send(embed=embed)
|
|
|
|
await Star(
|
|
index=count,
|
|
message=message.id,
|
|
channel=channel.id,
|
|
guild=ctx.guild.id,
|
|
starboard=starboard.id,
|
|
admin=ctx.author.id,
|
|
star=star.id,
|
|
active=True,
|
|
).commit()
|
|
|
|
components[0].components[0].disabled = True
|
|
|
|
await com_ctx.context.edit_origin(
|
|
content=f"Message saved to Starboard.\nSee it in {starboard.mention}",
|
|
components=components,
|
|
)
|
|
|
|
@context_menu(name="Star Message", context_type=CommandTypes.MESSAGE)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _star_message(self, ctx: InteractionContext) -> None:
|
|
await self._star_add(ctx, message=str(ctx.target_id))
|
|
|
|
star = SlashCommand(
|
|
name="star",
|
|
description="Manage stars",
|
|
)
|
|
|
|
@star.subcommand(
|
|
sub_cmd_name="add",
|
|
sub_cmd_description="Star a message",
|
|
)
|
|
@slash_option(
|
|
name="message", description="Message to star", opt_type=OptionTypes.STRING, required=True
|
|
)
|
|
@slash_option(
|
|
name="channel",
|
|
description="Channel that has the message, not required if used in same channel",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _star_message_slash(
|
|
self, ctx: InteractionContext, message: str, channel: GuildText
|
|
) -> None:
|
|
await self._star_add(ctx, message, channel)
|
|
|
|
@star.subcommand(sub_cmd_name="delete", sub_cmd_description="Delete a starred message")
|
|
@slash_option(
|
|
name="id", description="Star ID to delete", opt_type=OptionTypes.INTEGER, required=True
|
|
)
|
|
@slash_option(
|
|
name="starboard",
|
|
description="Starboard to delete star from",
|
|
opt_type=OptionTypes.CHANNEL,
|
|
required=True,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _star_delete(
|
|
self,
|
|
ctx: InteractionContext,
|
|
id: int,
|
|
starboard: GuildText,
|
|
) -> None:
|
|
if not isinstance(starboard, GuildText):
|
|
await ctx.send("Channel must be a GuildText channel", ephemeral=True)
|
|
return
|
|
|
|
exists = await Starboard.find_one(q(channel=starboard.id, guild=ctx.guild.id))
|
|
if not exists:
|
|
# TODO: automagically create starboard
|
|
await ctx.send(
|
|
f"Starboard does not exist in {starboard.mention}. Please create it first",
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
star = await Star.find_one(
|
|
q(
|
|
starboard=starboard.id,
|
|
index=id,
|
|
guild=ctx.guild.id,
|
|
active=True,
|
|
)
|
|
)
|
|
if not star:
|
|
await ctx.send(f"No star exists with id {id}", ephemeral=True)
|
|
return
|
|
|
|
message = await starboard.fetch_message(star.star)
|
|
if message:
|
|
await message.delete()
|
|
|
|
await star.delete()
|
|
|
|
await ctx.send(f"Star {id} deleted from {starboard.mention}")
|
|
|
|
|
|
def setup(bot: Client) -> None:
|
|
"""Add StarboardCog to JARVIS"""
|
|
StarboardCog(bot)
|