Merge branch 'dev'

This commit is contained in:
Zeva Rose 2022-10-17 00:59:19 +00:00
commit cc5456457b
4 changed files with 365 additions and 73 deletions

View file

@ -102,6 +102,7 @@ class ErrorMixin:
)
await ctx.send("Whoops! Encountered an error. The error has been logged.", ephemeral=True)
try:
await ctx.defer(ephemeral=True)
return await super().on_command_error(ctx, error, *args, **kwargs)
except Exception as e:
self.logger.error("Uncaught exception", exc_info=e)

263
jarvis/cogs/ltx.py Normal file
View file

@ -0,0 +1,263 @@
"""JARVIS LTX commands for dbrand dishpits."""
from typing import Dict, List
from dateparser import parse
from dateparser_data.settings import default_parsers
from jarvis_core.db import q
from jarvis_core.db.models import Event
from naff import Client, Extension, InteractionContext
from naff.models.discord.embed import EmbedField
from naff.models.discord.user import Member
from naff.models.naff.application_commands import (
OptionTypes,
SlashCommand,
slash_option,
)
from jarvis.utils import build_embed
dipshit_id = 552574845078994967
class EventCog(Extension):
def __init__(self, bot: Client):
self.bot = bot
self.cache: Dict[int, List[str]] = {}
self.add_ext_check(self.is_dipshit)
async def is_dipshit(self, ctx: InteractionContext) -> bool:
"""Checks if author is bot owner."""
return dipshit_id in ctx.author._role_ids
ltx = SlashCommand(name="ltx", description="LTX Meetup management", scopes=[520021794380447745])
@ltx.subcommand(sub_cmd_name="register", sub_cmd_description="Register for LTX")
@slash_option(name="going", description="Are you going?", opt_type=OptionTypes.BOOLEAN)
async def _ltx_register(self, ctx: InteractionContext, going: bool) -> None:
event = await Event.find_one(q(user=ctx.author.id, event="ltx"))
if not event:
event = Event(user=ctx.author.id, going=going)
event.going = going
msg = "going" if going else "not going"
await ctx.send(f"Registration updated! You are now {msg}", ephemeral=True)
@ltx.subcommand(sub_cmd_name="show", sub_cmd_description="Show registration info")
@slash_option(
name="user", description="User to show", opt_type=OptionTypes.USER, required=False
)
async def _ltx_show(self, ctx: InteractionContext, user: Member) -> None:
user = user or ctx.author
event = await Event.find_one(q(user=user.id, event="ltx"))
if not event:
await ctx.send("That user hasn't registered", ephemeral=False)
return
if not event.going:
await ctx.send("That user isn't going", ephemeral=False)
return
fields = []
if event.travel_method == "flying":
before_flight = "N/A"
if event.before_flight:
dts = int(event.before_departure_time.timestamp())
ats = int(event.before_arrival_time.timestamp())
before_flight = f"🛫 {event.before_flight} 🛬\n<t:{dts}:f> -> <t:{ats}:f>"
after_flight = "N/A"
if event.after_flight:
dts = int(event.after_departure_time.timestamp())
ats = int(event.after_arrival_time.timestamp())
after_flight = f"🛫 {event.after_flight} 🛬\n<t:{dts}:f> -> <t:{ats}:f>"
fields += [
EmbedField(name="Before LTX flight", value=before_flight),
EmbedField(name="After LTX flight", value=after_flight),
]
fields.append(EmbedField(name="Hotel", value=event.hotel or "N/A"))
embed = build_embed(title="Your LTX Details", description=None, fields=fields)
embed.set_author(name=user.display_name, icon_url=user.display_avatar.url)
embed.set_footer(text="LTX Dates: July 29-30, 2023")
await ctx.send(embeds=embed)
@ltx.subcommand(sub_cmd_name="before_flight", sub_cmd_description="Update pre-LTX flight info")
@slash_option(
name="departure", description="Departure Time", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="arrival", description="Arrival Time", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="from_airport",
description="Departure Airport",
opt_type=OptionTypes.STRING,
required=True,
)
@slash_option(
name="to_airport", description="Arrival Airport", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="flight", description="Flight Number", opt_type=OptionTypes.STRING, required=True
)
async def _ltx_before_flight(
self,
ctx: InteractionContext,
departure: str,
arrival: str,
from_airport: str,
to_airport: str,
flight: str,
) -> None:
event = await Event.find_one(q(user=ctx.author.id, event="ltx"))
if not event:
event = Event(user=ctx.author.id, event="ltx", going=True)
base_settings = {
"PREFER_DATES_FROM": "future",
"TIMEZONE": "UTC",
"RETURN_AS_TIMEZONE_AWARE": True,
}
rt_settings = base_settings.copy()
rt_settings["PARSERS"] = [
x for x in default_parsers if x not in ["absolute-time", "timestamp"]
]
rt_depart_at = parse(departure, settings=rt_settings)
rt_arrive_at = parse(arrival, settings=rt_settings)
at_settings = base_settings.copy()
at_settings["PARSERS"] = [x for x in default_parsers if x != "relative-time"]
at_depart_at = parse(departure, settings=at_settings)
at_arrive_at = parse(arrival, settings=at_settings)
if rt_depart_at:
departure = rt_depart_at
elif at_depart_at:
departure = at_depart_at
else:
await ctx.send("Invalid departure time", ephemeral=True)
return
if rt_arrive_at:
arrival = rt_arrive_at
elif at_arrive_at:
arrival = at_arrive_at
else:
await ctx.send("Invalid arrival time", ephemeral=True)
return
event.before_arrival_time = arrival
event.before_departure_time = departure
event.before_flight = f"{from_airport} -> {to_airport} {flight}"
dts = int(departure.timestamp())
ats = int(arrival.timestamp())
fields = (
EmbedField(name="Departure", value=f"<t:{dts}:F> (<t:{dts}:R>)"),
EmbedField(name="Arrival", value=f"<t:{ats}:F> (<t:{ats}:R>)"),
)
embed = build_embed(
title="Your Pre-LTX Flight Information",
description=f"🛫 {from_airport} -> {to_airport} 🛬",
fields=fields,
)
embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url)
await ctx.send(embeds=embed)
@ltx.subcommand(sub_cmd_name="after_flight", sub_cmd_description="Update post-LTX flight info")
@slash_option(
name="departure", description="Departure Time", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="arrival", description="Arrival Time", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="from_airport",
description="Departure Airport",
opt_type=OptionTypes.STRING,
required=True,
)
@slash_option(
name="to_airport", description="Arrival Airport", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="flight", description="Flight Number", opt_type=OptionTypes.STRING, required=True
)
async def _ltx_after_flight(
self,
ctx: InteractionContext,
departure: str,
arrival: str,
from_airport: str,
to_airport: str,
flight: str,
) -> None:
event = await Event.find_one(q(user=ctx.author.id, event="ltx"))
if not event:
event = Event(user=ctx.author.id, event="ltx", going=True)
base_settings = {
"PREFER_DATES_FROM": "future",
"TIMEZONE": "UTC",
"RETURN_AS_TIMEZONE_AWARE": True,
}
rt_settings = base_settings.copy()
rt_settings["PARSERS"] = [
x for x in default_parsers if x not in ["absolute-time", "timestamp"]
]
rt_depart_at = parse(departure, settings=rt_settings)
rt_arrive_at = parse(arrival, settings=rt_settings)
at_settings = base_settings.copy()
at_settings["PARSERS"] = [x for x in default_parsers if x != "relative-time"]
at_depart_at = parse(departure, settings=at_settings)
at_arrive_at = parse(arrival, settings=at_settings)
if rt_depart_at:
departure = rt_depart_at
elif at_depart_at:
departure = at_depart_at
else:
await ctx.send("Invalid departure time", ephemeral=True)
return
if rt_arrive_at:
arrival = rt_arrive_at
elif at_arrive_at:
arrival = at_arrive_at
else:
await ctx.send("Invalid arrival time", ephemeral=True)
return
event.after_arrival_time = arrival
event.after_departure_time = departure
event.after_flight = f"{from_airport} -> {to_airport} {flight}"
dts = int(departure.timestamp())
ats = int(arrival.timestamp())
fields = (
EmbedField(name="Departure", value=f"<t:{dts}:F> (<t:{dts}:R>)"),
EmbedField(name="Arrival", value=f"<t:{ats}:F> (<t:{ats}:R>)"),
)
embed = build_embed(
title="Your Post-LTX Flight Information",
description=f"🛫 {from_airport} -> {to_airport} 🛬",
fields=fields,
)
embed.set_author(name=ctx.author.display_name, icon_url=ctx.author.display_avatar.url)
await ctx.send(embeds=embed)

View file

@ -1,10 +1,18 @@
"""JARVIS Role Giver Cog."""
import asyncio
import logging
from typing import Dict
from jarvis_core.db import q
from jarvis_core.db.models import Rolegiver
from naff import Client, Extension, InteractionContext, Permissions
from naff import (
AutocompleteContext,
Client,
Extension,
InteractionContext,
Permissions,
listen,
)
from naff.client.utils.misc_utils import get
from naff.models.discord.components import ActionRow, Button, Select, SelectOption
from naff.models.discord.embed import EmbedField
@ -17,6 +25,7 @@ from naff.models.naff.application_commands import (
)
from naff.models.naff.command import check, cooldown
from naff.models.naff.cooldowns import Buckets
from thefuzz import process
from jarvis.utils import build_embed
from jarvis.utils.permissions import admin_or_permissions
@ -28,6 +37,24 @@ class RolegiverCog(Extension):
def __init__(self, bot: Client):
self.bot = bot
self.logger = logging.getLogger(__name__)
self.cache: Dict[int, Dict[str, int]] = {}
@listen()
async def on_ready(self) -> None:
"""NAFF on_ready hook for loading cache."""
all_rolegivers = await Rolegiver.find({}).to_list(None)
for rolegiver in all_rolegivers:
guild = await self.bot.fetch_guild(rolegiver.guild)
if not guild:
await rolegiver.delete()
continue
role = await guild.fetch_role(rolegiver.role)
if not role:
await rolegiver.delete()
continue
if guild.id not in self.cache:
self.cache[guild.id] = {}
self.cache[guild.id][role.name] = role.id
rolegiver = SlashCommand(name="rolegiver", description="Allow users to choose their own roles")
@ -59,10 +86,6 @@ class RolegiverCog(Extension):
setting.roles = setting.roles or []
if len(setting.roles) >= 20:
await ctx.send("You can only have 20 roles in the rolegiver", ephemeral=True)
return
setting.roles.append(role.id)
await setting.commit()
@ -97,87 +120,68 @@ class RolegiverCog(Extension):
)
await ctx.send(embeds=embed, components=components)
if ctx.guild.id not in self.cache:
self.cache[ctx.guild.id] = {}
self.cache[ctx.guild.id][role.name] = role.id
@rolegiver.subcommand(sub_cmd_name="remove", sub_cmd_description="Remove a role from rolegiver")
@slash_option(
name="role",
description="Name of role to add",
opt_type=OptionTypes.STRING,
required=True,
autocomplete=True,
)
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _rolegiver_remove(self, ctx: InteractionContext) -> None:
async def _rolegiver_remove(self, ctx: InteractionContext, role: str) -> None:
setting = await Rolegiver.find_one(q(guild=ctx.guild.id))
if not setting or (setting and not setting.roles):
await ctx.send("Rolegiver has no roles", ephemeral=True)
return
options = []
for role in setting.roles:
role: Role = await ctx.guild.fetch_role(role)
option = SelectOption(label=role.name, value=str(role.id))
options.append(option)
cache = self.cache.get(ctx.guild.id)
if cache:
role_id = cache.get(role)
else:
await ctx.send("Something went wrong, please try a different role", ephemeral=True)
return
select = Select(
options=options,
custom_id="to_delete",
placeholder="Select roles to remove",
min_values=1,
max_values=len(options),
setting.value.remove(role_id)
await setting.commit()
role = await ctx.guild.fetch_role(role_id)
if not role:
await ctx.send("Role not found in guild", ephemeral=True)
remaining = []
to_remove = []
for id_ in setting.value:
if role := await ctx.guild.fetch_role(id_):
remaining.append(role)
else:
to_remove.append(id_)
setting.value = [x for x in setting.value if x not in to_remove]
await setting.commit()
fields = [
EmbedField(name="Removed Role", value=role.mention),
EmbedField(name="Remaining Role(s)", value="\n".join([x.mention for x in remaining])),
]
embed = build_embed(
title="Rolegiver Updated", description="Role removed from rolegiver", fields=fields
)
components = [ActionRow(select)]
message = await ctx.send(content="\u200b", components=components)
try:
context = await self.bot.wait_for_component(
check=lambda x: ctx.author.id == x.context.author.id,
messages=message,
timeout=60 * 1,
)
removed_roles = []
for to_delete in context.context.values:
role = await ctx.guild.fetch_role(to_delete)
if role:
removed_roles.append(role)
setting.roles.remove(int(to_delete))
await setting.commit()
embed.set_thumbnail(url=ctx.guild.icon.url)
for row in components:
for component in row.components:
component.disabled = True
embed.set_footer(text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}")
roles = []
for role_id in setting.roles:
e_role = await ctx.guild.fetch_role(role_id)
if not e_role:
continue
roles.append(e_role)
await ctx.send(
embeds=embed,
)
if roles:
roles.sort(key=lambda x: -x.position)
value = "\n".join([r.mention for r in roles]) if roles else "None"
rvalue = "\n".join([r.mention for r in removed_roles]) if removed_roles else "None"
fields = [
EmbedField(name="Removed Role(s)", value=rvalue),
EmbedField(name="Remaining Role(s)", value=value),
]
embed = build_embed(
title="Rolegiver Updated",
description="Role removed from rolegiver",
fields=fields,
)
embed.set_thumbnail(url=ctx.guild.icon.url)
embed.set_footer(
text=f"{ctx.author.username}#{ctx.author.discriminator} | {ctx.author.id}"
)
await context.context.edit_origin(
content=f"Removed {len(context.context.values)} role(s)",
embeds=embed,
components=components,
)
except asyncio.TimeoutError:
for row in components:
for component in row.components:
component.disabled = True
await message.edit(components=components)
if ctx.guild.id in self.cache:
self.cache[ctx.guild.id].pop(role.name)
@rolegiver.subcommand(sub_cmd_name="list", sub_cmd_description="List rolegiver roles")
async def _rolegiver_list(self, ctx: InteractionContext) -> None:
@ -382,6 +386,23 @@ class RolegiverCog(Extension):
await setting.commit()
await ctx.send("Rolegiver cleanup finished")
self.cache.pop(ctx.guild.id, None)
@_rolegiver_remove.autocomplete("role")
async def _autocomplete(self, ctx: AutocompleteContext, role: str) -> None:
if not self.cache.get(ctx.guild.id):
rolegivers = await Rolegiver.find(q(guild=ctx.guild.id)).to_list(None)
for rolegiver in rolegivers:
role = await ctx.guild.fetch_role(rolegiver.role)
if not role:
await rolegiver.delete()
continue
if ctx.guild.id not in self.cache:
self.cache[ctx.guild.id] = {}
self.cache[ctx.guild.id][role.name] = role.id
results = process.extract(role, self.cache.get(ctx.guild.id).keys(), limit=25)
choices = [{"name": r[0], "value": r[0]} for r in results]
await ctx.send(choices=choices)
def setup(bot: Client) -> None:

View file

@ -52,6 +52,13 @@ class UtilCog(Extension):
bot = SlashCommand(name="bot", description="Bot commands")
@bot.subcommand(sub_cmd_name="sex", sub_cmd_description="Have sex with JARVIS")
async def _sex(self, ctx: InteractionContext) -> None:
if ctx.author.id == 264072583987593217:
await ctx.send("Oh fuck no, go fuck yourself")
else:
await ctx.send("Not at this time, thank you for offering")
@bot.subcommand(sub_cmd_name="status", sub_cmd_description="Retrieve JARVIS status")
@cooldown(bucket=Buckets.CHANNEL, rate=1, interval=30)
async def _status(self, ctx: InteractionContext) -> None: