Added pagination features to /warnings, /bans, added /ctc2 guesses
This commit is contained in:
parent
b2753cdd04
commit
a97f474525
4 changed files with 371 additions and 70 deletions
|
@ -36,7 +36,7 @@ jarvis = commands.Bot(
|
|||
)
|
||||
slash = SlashCommand(jarvis, sync_commands=True, sync_on_cog_reload=True)
|
||||
jarvis_self = Process()
|
||||
__version__ = "1.3.0"
|
||||
__version__ = "1.4.0"
|
||||
|
||||
|
||||
db = DBManager(get_config().mongo).mongo
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
import re
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Union
|
||||
|
||||
import pymongo
|
||||
from ButtonPaginator import Paginator
|
||||
from discord import Member, Role, TextChannel, User, VoiceChannel
|
||||
from discord.ext import commands
|
||||
from discord.ext.tasks import loop
|
||||
from discord.utils import find, get
|
||||
from discord_slash import SlashContext, cog_ext
|
||||
from discord_slash.model import ButtonStyle
|
||||
from discord_slash.utils.manage_commands import create_choice, create_option
|
||||
|
||||
import jarvis
|
||||
|
@ -16,6 +19,7 @@ from jarvis.db.types import (
|
|||
Ban,
|
||||
Kick,
|
||||
Lock,
|
||||
MongoSort,
|
||||
Mute,
|
||||
Purge,
|
||||
Setting,
|
||||
|
@ -38,6 +42,19 @@ class AdminCog(commands.Cog):
|
|||
self.bot = bot
|
||||
config = jarvis.config.get_config()
|
||||
self.db = DBManager(config.mongo).mongo.jarvis
|
||||
self.cache = {}
|
||||
self._expire_interaction.start()
|
||||
|
||||
def check_cache(self, ctx: SlashContext, **kwargs):
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
return find(
|
||||
lambda x: x["command"] == ctx.subcommand_name
|
||||
and x["user"] == ctx.author.id
|
||||
and x["guild"] == ctx.guild.id
|
||||
and all(x[k] == v for k, v in kwargs.items()),
|
||||
self.cache.values(),
|
||||
)
|
||||
|
||||
@cog_ext.cog_slash(
|
||||
name="ban",
|
||||
|
@ -316,6 +333,15 @@ class AdminCog(commands.Cog):
|
|||
async def _bans_list(
|
||||
self, ctx: SlashContext, type: int = 0, active: int = 1
|
||||
):
|
||||
exists = self.check_cache(ctx, type=type)
|
||||
if exists:
|
||||
await ctx.defer(hidden=True)
|
||||
await ctx.send(
|
||||
"Please use existing interaction: "
|
||||
+ f"{exists['paginator']._message.jump_url}",
|
||||
hidden=True,
|
||||
)
|
||||
return
|
||||
active = bool(active)
|
||||
types = [0, "perm", "temp", "soft"]
|
||||
search = {"guild": ctx.guild.id}
|
||||
|
@ -325,36 +351,85 @@ class AdminCog(commands.Cog):
|
|||
search["type"] = types[type]
|
||||
bans = Ban.get_many(**search)
|
||||
bans.sort(key=lambda x: x.created_at, reverse=True)
|
||||
ban_messages = []
|
||||
db_bans = []
|
||||
fields = []
|
||||
for ban in bans:
|
||||
if not ban.username:
|
||||
user = await self.bot.fetch_user(ban.user)
|
||||
ban.username = user.name if user else "[deleted user]"
|
||||
ban_messages.append(
|
||||
"[{0}] {1} ({2}): {3}".format(
|
||||
ban.created_at.strftime("%d-%m-%Y"),
|
||||
ban.username,
|
||||
ban.user,
|
||||
ban.reason,
|
||||
fields.append(
|
||||
Field(
|
||||
name=f"Username: {ban.username}#{ban.discrim}",
|
||||
value="Date: {ban.created_at.strftime('%d-%m-%Y')}\n"
|
||||
+ f"User ID: {ban.user}\n"
|
||||
+ f"Reason: {ban.reason}\n"
|
||||
+ f"Type: {ban.type}\n\u200b",
|
||||
inline=False,
|
||||
)
|
||||
)
|
||||
db_bans.append(ban.user)
|
||||
if type == 0:
|
||||
bans = await ctx.guild.bans()
|
||||
for ban in bans:
|
||||
if ban.user.id not in db_bans:
|
||||
ban_messages.append(
|
||||
"[unknown] {0} ({1}): {2}".format(
|
||||
ban.user.name, ban.user.id, ban.reason
|
||||
fields.append(
|
||||
Field(
|
||||
name=f"Username: {ban.user.name}#"
|
||||
+ f"{ban.user.discriminator}",
|
||||
value="Date: [unknown]\n"
|
||||
+ f"User ID: {ban.user.id}\n"
|
||||
+ f"Reason: {ban.reason}\n"
|
||||
+ "Type: manual\n\u200b",
|
||||
inline=False,
|
||||
)
|
||||
)
|
||||
message = ""
|
||||
if len(ban_messages) == 0:
|
||||
message = "No bans matched the criteria."
|
||||
|
||||
pages = []
|
||||
title = "Active " if active else "Inactive "
|
||||
if type > 0:
|
||||
title += types[type]
|
||||
if type == 1:
|
||||
title += "a"
|
||||
title += "bans"
|
||||
if len(fields) == 0:
|
||||
embed = build_embed(
|
||||
title=title,
|
||||
description=f"No {'in' if not active else ''}active bans",
|
||||
fields=[],
|
||||
)
|
||||
embed.set_thumbnail(url=ctx.guild.icon_url)
|
||||
pages.append(embed)
|
||||
else:
|
||||
message = "Active " if active else "Inactive "
|
||||
message += "Bans:\n```\n" + "\n".join(ban_messages) + "\n```"
|
||||
await ctx.send(message)
|
||||
for i in range(0, len(bans), 5):
|
||||
embed = build_embed(
|
||||
title=title, description="", fields=fields[i : i + 5]
|
||||
)
|
||||
embed.set_thumbnail(url=ctx.guild.icon_url)
|
||||
pages.append(embed)
|
||||
|
||||
paginator = Paginator(
|
||||
bot=self.bot,
|
||||
ctx=ctx,
|
||||
embeds=pages,
|
||||
only=ctx.author,
|
||||
timeout=60 * 5, # 5 minute timeout
|
||||
disable_after_timeout=True,
|
||||
use_extend=len(pages) > 2,
|
||||
left_button_style=ButtonStyle.grey,
|
||||
right_button_style=ButtonStyle.grey,
|
||||
basic_buttons=["◀", "▶"],
|
||||
)
|
||||
|
||||
self.cache[hash(paginator)] = {
|
||||
"guild": ctx.guild.id,
|
||||
"user": ctx.author.id,
|
||||
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
||||
"command": ctx.subcommand_name,
|
||||
"type": type,
|
||||
"paginator": paginator,
|
||||
}
|
||||
|
||||
await paginator.start()
|
||||
|
||||
@cog_ext.cog_slash(
|
||||
name="kick",
|
||||
|
@ -822,41 +897,123 @@ class AdminCog(commands.Cog):
|
|||
option_type=6,
|
||||
required=True,
|
||||
),
|
||||
create_option(
|
||||
name="active",
|
||||
description="View only active",
|
||||
option_type=4,
|
||||
required=False,
|
||||
choices=[
|
||||
create_choice(name="Yes", value=1),
|
||||
create_choice(name="No", value=0),
|
||||
],
|
||||
),
|
||||
],
|
||||
)
|
||||
@commands.has_permissions(administrator=True)
|
||||
async def _warnings(self, ctx: SlashContext, user: User):
|
||||
async def _warnings(self, ctx: SlashContext, user: User, active: bool = 1):
|
||||
active = bool(active)
|
||||
exists = self.check_cache(ctx, user_id=user.id, active=active)
|
||||
if exists:
|
||||
await ctx.defer(hidden=True)
|
||||
warnings = Warning.get_many(user=user.id, guild=ctx.guild.id)
|
||||
active = [
|
||||
f'`{y.created_at.strftime("%Y-%m-%d %H:%M:%S")}` - {y.reason}'
|
||||
for y in list(filter(lambda x: x.active, warnings))
|
||||
]
|
||||
await ctx.send(
|
||||
"Please use existing interaction: "
|
||||
+ f"{exists['paginator']._message.jump_url}",
|
||||
hidden=True,
|
||||
)
|
||||
return
|
||||
warnings = Warning.get_many(
|
||||
user=user.id,
|
||||
guild=ctx.guild.id,
|
||||
sort=MongoSort(direction="desc", key="created_at"),
|
||||
)
|
||||
active_warns = list(filter(lambda x: x.active, warnings))
|
||||
|
||||
n_active = len(active)
|
||||
if len(active) > 10:
|
||||
active = active[:10]
|
||||
active.append("\n>10 active, results truncated")
|
||||
elif len(active) == 0:
|
||||
active = ["No active warnings"]
|
||||
|
||||
total = len(warnings)
|
||||
fields = [
|
||||
Field(f"{n_active} Active", "\n".join(active), False),
|
||||
Field("Total", total),
|
||||
]
|
||||
pages = []
|
||||
if active:
|
||||
if len(active_warns) == 0:
|
||||
embed = build_embed(
|
||||
title="Warnings",
|
||||
description=f"{user.mention} active and total warnings",
|
||||
fields=fields,
|
||||
description=f"{len(warnings)} total | 0 currently active",
|
||||
fields=[],
|
||||
)
|
||||
embed.set_author(name=user.name, icon_url=user.avatar_url)
|
||||
embed.set_thumbnail(url=ctx.guild.icon_url)
|
||||
pages.append(embed)
|
||||
else:
|
||||
fields = []
|
||||
for warn in active_warns:
|
||||
fields.append(
|
||||
Field(
|
||||
name=warn.created_at.strftime(
|
||||
"%Y-%m-%d %H:%M:%S UTC"
|
||||
),
|
||||
value=warn.reason + "\n\u200b",
|
||||
inline=False,
|
||||
)
|
||||
)
|
||||
for i in range(0, len(fields), 5):
|
||||
embed = build_embed(
|
||||
title="Warnings",
|
||||
description=f"{len(warnings)} total | "
|
||||
+ f"{len(active_warns)} currently active",
|
||||
fields=fields[i : i + 5],
|
||||
)
|
||||
embed.set_author(
|
||||
name=user.nick if user.nick else user.name,
|
||||
name=user.name + "#" + user.discriminator,
|
||||
icon_url=user.avatar_url,
|
||||
)
|
||||
embed.set_footer(text=f"{user.name}#{user.discriminator} | {user.id}")
|
||||
embed.set_thumbnail(url=user.avatar_url)
|
||||
await ctx.send(embed=embed)
|
||||
embed.set_thumbnail(url=ctx.guild.icon_url)
|
||||
pages.append(embed)
|
||||
else:
|
||||
fields = []
|
||||
for warn in warnings:
|
||||
title = "[A] " if warn.active else "[I] "
|
||||
title += warn.created_at.strftime("%Y-%m-%d %H:%M:%S UTC")
|
||||
fields.append(
|
||||
Field(
|
||||
name=title,
|
||||
value=warn.reason + "\n\u200b",
|
||||
inline=False,
|
||||
)
|
||||
)
|
||||
for i in range(0, len(fields), 5):
|
||||
embed = build_embed(
|
||||
title="Warnings",
|
||||
description=f"{len(warnings)} total | "
|
||||
+ f"{len(active_warns)} currently active",
|
||||
fields=fields[i : i + 5],
|
||||
)
|
||||
embed.set_author(
|
||||
name=user.name + "#" + user.discriminator,
|
||||
icon_url=user.avatar_url,
|
||||
)
|
||||
embed.set_thumbnail(url=ctx.guild.icon_url)
|
||||
pages.append(embed)
|
||||
|
||||
paginator = Paginator(
|
||||
bot=self.bot,
|
||||
ctx=ctx,
|
||||
embeds=pages,
|
||||
only=ctx.author,
|
||||
timeout=60 * 5, # 5 minute timeout
|
||||
disable_after_timeout=True,
|
||||
use_extend=len(pages) > 2,
|
||||
left_button_style=ButtonStyle.grey,
|
||||
right_button_style=ButtonStyle.grey,
|
||||
basic_buttons=["◀", "▶"],
|
||||
)
|
||||
|
||||
self.cache[hash(paginator)] = {
|
||||
"guild": ctx.guild.id,
|
||||
"user": ctx.author.id,
|
||||
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
||||
"command": ctx.subcommand_name,
|
||||
"user_id": user.id,
|
||||
"active": active,
|
||||
"paginator": paginator,
|
||||
}
|
||||
|
||||
await paginator.start()
|
||||
|
||||
@cog_ext.cog_subcommand(
|
||||
base="roleping",
|
||||
|
@ -1041,6 +1198,15 @@ class AdminCog(commands.Cog):
|
|||
f"Autopurge delay updated to {delay} seconds on {channel.mention}."
|
||||
)
|
||||
|
||||
@loop(minutes=1)
|
||||
async def _expire_interaction(self):
|
||||
keys = list(self.cache.keys())
|
||||
for key in keys:
|
||||
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
|
||||
minutes=1
|
||||
):
|
||||
del self.cache[key]
|
||||
|
||||
|
||||
def setup(bot):
|
||||
bot.add_cog(AdminCog(bot))
|
||||
|
|
|
@ -1,12 +1,29 @@
|
|||
import re
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import aiohttp
|
||||
import pymongo
|
||||
from ButtonPaginator import Paginator
|
||||
from discord import Member, User
|
||||
from discord.ext import commands
|
||||
from discord_slash import cog_ext
|
||||
from discord.ext.tasks import loop
|
||||
from discord.utils import find
|
||||
from discord_slash import SlashContext, cog_ext
|
||||
from discord_slash.model import ButtonStyle
|
||||
|
||||
from jarvis.config import get_config
|
||||
from jarvis.db import DBManager
|
||||
from jarvis.utils import build_embed
|
||||
from jarvis.utils.field import Field
|
||||
|
||||
guild_ids = [578757004059738142, 520021794380447745, 862402786116763668]
|
||||
|
||||
valid = re.compile(r"[\w\s\-\\/.!@#$%^*()+=<>,\u0080-\U000E0FFF]*")
|
||||
invites = re.compile(
|
||||
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
|
||||
flags=re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
class CTCCog(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
|
@ -15,6 +32,19 @@ class CTCCog(commands.Cog):
|
|||
self.db = DBManager(mconf).mongo
|
||||
self._session = aiohttp.ClientSession()
|
||||
self.url = "https://completethecodetwo.cards/pw"
|
||||
self.cache = {}
|
||||
self._expire_interaction.start()
|
||||
|
||||
def check_cache(self, ctx: SlashContext, **kwargs):
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
return find(
|
||||
lambda x: x["command"] == ctx.subcommand_name
|
||||
and x["user"] == ctx.author.id
|
||||
and x["guild"] == ctx.guild.id
|
||||
and all(x[k] == v for k, v in kwargs.items()),
|
||||
self.cache.values(),
|
||||
)
|
||||
|
||||
@cog_ext.cog_subcommand(
|
||||
base="ctc2",
|
||||
|
@ -33,23 +63,129 @@ class CTCCog(commands.Cog):
|
|||
guild_ids=guild_ids,
|
||||
)
|
||||
@commands.cooldown(1, 2, commands.BucketType.user)
|
||||
async def _pw(self, ctx, guess: str):
|
||||
async def _pw(self, ctx: SlashContext, guess: str):
|
||||
if len(guess) > 800:
|
||||
await ctx.send(
|
||||
"Listen here, dipshit. Don't be like "
|
||||
+ "<@256110768724901889>. Make your guesses < 800 characters.",
|
||||
hidden=True,
|
||||
)
|
||||
return
|
||||
elif not valid.fullmatch(guess):
|
||||
await ctx.send(
|
||||
"Listen here, dipshit. Don't be like "
|
||||
+ "<@256110768724901889>. Make your guesses *readable*.",
|
||||
hidden=True,
|
||||
)
|
||||
return
|
||||
elif invites.search(guess):
|
||||
await ctx.send(
|
||||
"Listen here, dipshit. "
|
||||
+ "No using this to bypass sending invite links.",
|
||||
hidden=True,
|
||||
)
|
||||
return
|
||||
guessed = self.db.ctc2.guesses.find_one({"guess": guess})
|
||||
if guessed:
|
||||
await ctx.send("Already guessed, dipshit.", hidden=True)
|
||||
return
|
||||
result = await self._session.post(self.url, data=guess)
|
||||
message = ""
|
||||
correct = False
|
||||
if 200 <= result.status < 400:
|
||||
message = f"{ctx.author.mention} got it! Password is {guess}!"
|
||||
await ctx.send(
|
||||
f"{ctx.author.mention} got it! Password is {guess}!"
|
||||
)
|
||||
correct = True
|
||||
else:
|
||||
message = "Nope."
|
||||
await ctx.send("Nope.", hidden=True)
|
||||
self.db.ctc2.guesses.insert_one(
|
||||
{"guess": guess, "user": ctx.author.id, "correct": correct}
|
||||
)
|
||||
await ctx.send(message)
|
||||
|
||||
@cog_ext.cog_subcommand(
|
||||
base="ctc2",
|
||||
name="guesses",
|
||||
description="Show guesses made for https://completethecodetwo.cards",
|
||||
guild_ids=guild_ids,
|
||||
)
|
||||
@commands.cooldown(1, 2, commands.BucketType.user)
|
||||
async def _guesses(self, ctx: SlashContext):
|
||||
exists = self.check_cache(ctx)
|
||||
if exists:
|
||||
await ctx.defer(hidden=True)
|
||||
await ctx.send(
|
||||
"Please use existing interaction: "
|
||||
+ f"{exists['paginator']._message.jump_url}",
|
||||
hidden=True,
|
||||
)
|
||||
return
|
||||
guesses = self.db.ctc2.guesses.find().sort(
|
||||
[("correct", pymongo.DESCENDING), ("_id", pymongo.DESCENDING)]
|
||||
)
|
||||
fields = []
|
||||
for guess in guesses:
|
||||
user = ctx.guild.get_member(guess["user"])
|
||||
if not user:
|
||||
user = self.bot.fetch_user(guess["user"])
|
||||
if not user:
|
||||
user = "[redacted]"
|
||||
if isinstance(user, User) or isinstance(user, Member):
|
||||
user = user.name + "#" + user.discriminator
|
||||
name = "Correctly" if guess["correct"] else "Incorrectly"
|
||||
name += " guessed by: " + user
|
||||
fields.append(
|
||||
Field(
|
||||
name=name,
|
||||
value=guess["guess"] + "\n\u200b",
|
||||
inline=False,
|
||||
)
|
||||
)
|
||||
pages = []
|
||||
for i in range(0, len(fields), 5):
|
||||
embed = build_embed(
|
||||
title="completethecodetwo.cards guesses",
|
||||
description=f"{len(fields)} guesses so far",
|
||||
fields=fields[i : i + 5],
|
||||
url="https://completethecodetwo.cards",
|
||||
)
|
||||
embed.set_thumbnail(url="https://dev.zevaryx.com/db_logo.png")
|
||||
embed.set_footer(
|
||||
text="dbrand.com",
|
||||
icon_url="https://dev.zevaryx.com/db_logo.png",
|
||||
)
|
||||
pages.append(embed)
|
||||
|
||||
paginator = Paginator(
|
||||
bot=self.bot,
|
||||
ctx=ctx,
|
||||
embeds=pages,
|
||||
timeout=60 * 5, # 5 minute timeout
|
||||
only=ctx.author,
|
||||
disable_after_timeout=True,
|
||||
use_extend=len(pages) > 2,
|
||||
left_button_style=ButtonStyle.grey,
|
||||
right_button_style=ButtonStyle.grey,
|
||||
basic_buttons=["◀", "▶"],
|
||||
)
|
||||
|
||||
self.cache[hash(paginator)] = {
|
||||
"guild": ctx.guild.id,
|
||||
"user": ctx.author.id,
|
||||
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
||||
"command": ctx.subcommand_name,
|
||||
"paginator": paginator,
|
||||
}
|
||||
|
||||
await paginator.start()
|
||||
|
||||
@loop(minutes=1)
|
||||
async def _expire_interaction(self):
|
||||
keys = list(self.cache.keys())
|
||||
for key in keys:
|
||||
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
|
||||
minutes=1
|
||||
):
|
||||
del self.cache[key]
|
||||
|
||||
|
||||
def setup(bot):
|
||||
|
|
|
@ -5,9 +5,8 @@ from ButtonPaginator import Paginator
|
|||
from discord.ext import commands
|
||||
from discord.ext.tasks import loop
|
||||
from discord.utils import find
|
||||
from discord_slash import ComponentContext, SlashContext, cog_ext
|
||||
from discord_slash import SlashContext, cog_ext
|
||||
from discord_slash.model import ButtonStyle
|
||||
from discord_slash.utils import manage_components
|
||||
from discord_slash.utils.manage_commands import create_choice, create_option
|
||||
|
||||
from jarvis.config import get_config
|
||||
|
@ -29,6 +28,16 @@ class GitlabCog(commands.Cog):
|
|||
self.cache = {}
|
||||
self._expire_interaction.start()
|
||||
|
||||
def check_cache(self, ctx: SlashContext, **kwargs):
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
return find(
|
||||
lambda x: x["command"] == ctx.subcommand_name
|
||||
and x["user"] == ctx.author.id
|
||||
and all(x[k] == v for k, v in kwargs.items()),
|
||||
self.cache.values(),
|
||||
)
|
||||
|
||||
@cog_ext.cog_subcommand(
|
||||
base="gl",
|
||||
name="issue",
|
||||
|
@ -288,16 +297,6 @@ class GitlabCog(commands.Cog):
|
|||
)
|
||||
return embed
|
||||
|
||||
def check_cache(self, ctx: SlashContext, **kwargs):
|
||||
if not kwargs:
|
||||
kwargs = {}
|
||||
return find(
|
||||
lambda x: x["command"] == ctx.subcommand_name
|
||||
and x["user"] == ctx.author.id
|
||||
and all(x[k] == v for k, v in kwargs.items()),
|
||||
self.cache.values(),
|
||||
)
|
||||
|
||||
@cog_ext.cog_subcommand(
|
||||
base="gl",
|
||||
name="issues",
|
||||
|
|
Loading…
Add table
Reference in a new issue