Add roleping expansion, closes #38

This commit is contained in:
Zeva Rose 2021-07-26 19:10:49 -06:00
parent feda3036ab
commit e48c987bcd
3 changed files with 364 additions and 46 deletions

View file

@ -10,6 +10,7 @@ from psutil import Process
from jarvis import logo, tasks, utils
from jarvis.config import get_config
from jarvis.db import DBManager
from jarvis.events import guild, member, message
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
@ -70,6 +71,14 @@ def run(ctx=None):
jarvis.max_messages = config.max_messages
tasks.init()
# Add event listeners
listeners = [
guild.GuildEventHandler(jarvis),
member.MemberEventHandler(jarvis),
message.MessageEventHandler(jarvis),
]
jarvis.run(config.token, bot=True, reconnect=True)
for cog in jarvis.cogs:
session = getattr(cog, "_session", None)

View file

@ -1,71 +1,79 @@
from discord import Role
from datetime import datetime, timedelta
from ButtonPaginator import Paginator
from discord import Member, Role
from discord.ext import commands
from discord_slash import SlashContext, cog_ext
from discord_slash.model import ButtonStyle
from discord_slash.utils.manage_commands import create_option
from jarvis.db.types import Setting
from jarvis.db.types import Roleping
from jarvis.utils import build_embed
from jarvis.utils.cachecog import CacheCog
from jarvis.utils.field import Field
from jarvis.utils.permissions import admin_or_permissions
class RolepingCog(commands.Cog):
class RolepingCog(CacheCog):
def __init__(self, bot):
self.bot = bot
super().__init__(bot)
@cog_ext.cog_subcommand(
base="roleping",
name="block",
description="Add a role to the roleping blocklist",
name="add",
description="Add a role to roleping",
options=[
create_option(
name="role",
description="Role to add to blocklist",
description="Role to add to roleping",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_block(self, ctx: SlashContext, role: Role):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
roles = Setting(guild=ctx.guild.id, setting="roleping", value=[])
@admin_or_permissions(manage_server=True)
async def _roleping_add(self, ctx: SlashContext, role: Role):
roleping = Roleping.get(guild=ctx.guild.id, role=role.id)
if not roleping:
roleping = Roleping(
role=role.id,
guild=ctx.guild.id,
admin=ctx.author.id,
active=True,
bypass={"roles": [], "users": []},
)
if role.id in roles.value:
else:
await ctx.send(
f"Role `{role.name}` already in blocklist.", hidden=True
f"Role `{role.name}` already in roleping.", hidden=True
)
return
roles.value.append(role.id)
roles.update()
await ctx.send(f"Role `{role.name}` added to blocklist.")
roleping.insert()
await ctx.send(f"Role `{role.name}` added to roleping.")
@cog_ext.cog_subcommand(
base="roleping",
name="allow",
description="Remove a role from the roleping blocklist",
name="remove",
description="Remove a role from the roleping",
options=[
create_option(
name="role",
description="Role to remove from blocklist",
description="Role to remove from roleping",
option_type=8,
required=True,
)
],
)
@commands.has_permissions(administrator=True)
async def _roleping_allow(self, ctx: SlashContext, role: Role):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
@admin_or_permissions(manage_server=True)
async def _roleping_remove(self, ctx: SlashContext, role: Role):
roleping = Roleping.get(guild=ctx.guild.id, role=role.id)
if not roleping:
await ctx.send("Roleping does not exist", hidden=True)
return
if role.id not in roles.value:
await ctx.send(
f"Role `{role.name}` not in blocklist.", hidden=True
)
return
roles.value.remove(role.id)
roles.update()
await ctx.send(f"Role `{role.name}` removed blocklist.")
roleping.delete()
await ctx.send(f"Role `{role.name}` removed from roleping.")
@cog_ext.cog_subcommand(
base="roleping",
@ -73,19 +81,298 @@ class RolepingCog(commands.Cog):
description="List all blocklisted roles",
)
async def _roleping_list(self, ctx: SlashContext):
roles = Setting.get(guild=ctx.guild.id, setting="roleping")
if not roles:
await ctx.send("No blocklist configured.", hidden=True)
exists = self.check_cache(ctx)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
message = "Blocklisted Roles:\n```\n"
if not roles.value:
await ctx.send("No roles blocklisted.", hidden=True)
rolepings = Roleping.get_many(guild=ctx.guild.id)
if not rolepings:
await ctx.send("No rolepings configured", hidden=True)
return
for role in roles.value:
role = ctx.guild.get_role(role)
if not role:
continue
message += role.name + "\n"
message += "```"
await ctx.send(message)
embeds = []
for roleping in rolepings:
role = ctx.guild.get_role(roleping.role)
bypass_roles = list(
filter(
lambda x: x.id in roleping.bypass["roles"], ctx.guild.roles
)
)
bypass_roles = [
r.mention or "||`[redacted]`||" for r in bypass_roles
]
bypass_users = [
ctx.guild.get_member(u).mention or "||`[redacted]`||"
for u in roleping.bypass["users"]
]
bypass_roles = bypass_roles or ["None"]
bypass_users = bypass_users or ["None"]
embed = build_embed(
title="Roleping",
description=role.mention,
color=str(role.color),
fields=[
Field(
name="Created At",
value=roleping.created_at.strftime(
"%a, %b %d, %Y %I:%M %p"
),
inline=False,
),
Field(name="Active", value=str(roleping.active)),
Field(
name="Bypass Users",
value="\n".join(bypass_users),
),
Field(
name="Bypass Roles",
value="\n".join(bypass_roles),
),
],
)
admin = ctx.guild.get_member(roleping.admin)
if not admin:
admin = self.bot.user
embed.set_author(
name=admin.nick or admin.name, icon_url=admin.avatar_url
)
embed.set_footer(
text=f"{admin.name}#{admin.discriminator} | {admin.id}"
)
embeds.append(embed)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=embeds,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(embeds) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"guild": ctx.guild.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"paginator": paginator,
}
await paginator.start()
@cog_ext.cog_subcommand(
base="roleping",
subcommand_group="bypass",
name="user",
description="Add a user as a bypass to a roleping",
base_desc="Block roles from being pinged",
sub_group_desc="Allow specific users/roles to ping rolepings",
options=[
create_option(
name="user",
description="User to add",
option_type=6,
required=True,
),
create_option(
name="rping",
description="Rolepinged role",
option_type=8,
required=True,
),
],
)
@admin_or_permissions(manage_server=True)
async def _roleping_bypass_user(
self, ctx: SlashContext, user: Member, rping: Role
):
roleping = Roleping.get(guild=ctx.guild.id, role=rping.id)
if not roleping:
await ctx.send(
f"Roleping not configured for {rping.mention}", hidden=True
)
return
if user.id in roleping.bypass["users"]:
await ctx.send(f"{user.mention} already in bypass", hidden=True)
return
if len(roleping.bypass["users"]) == 10:
await ctx.send(
"Already have 10 users in bypass. "
"Please consider using roles for roleping bypass",
hidden=True,
)
return
matching_role = list(
filter(lambda x: x.id in roleping.bypass["roles"], user.roles)
)
if matching_role:
await ctx.send(
f"{user.mention} already has bypass "
f"via {matching_role[0].mention}",
hidden=True,
)
return
roleping.bypass["users"].append(user.id)
roleping.update()
await ctx.send(
f"{user.nick or user.name} user bypass added for `{rping.name}`"
)
@cog_ext.cog_subcommand(
base="roleping",
subcommand_group="bypass",
name="role",
description="Add a role as a bypass to a roleping",
base_desc="Block roles from being pinged",
sub_group_desc="Allow specific users/roles to ping rolepings",
options=[
create_option(
name="role",
description="Role to add",
option_type=8,
required=True,
),
create_option(
name="rping",
description="Rolepinged role",
option_type=8,
required=True,
),
],
)
@admin_or_permissions(manage_server=True)
async def _roleping_bypass_role(
self, ctx: SlashContext, role: Role, rping: Role
):
roleping = Roleping.get(guild=ctx.guild.id, role=rping.id)
if not roleping:
await ctx.send(
f"Roleping not configured for {rping.mention}", hidden=True
)
return
if role.id in roleping.bypass["roles"]:
await ctx.send(f"{role.mention} already in bypass", hidden=True)
return
if len(roleping.bypass["roles"]) == 10:
await ctx.send(
"Already have 10 roles in bypass. "
"Please consider consolidating roles for roleping bypass",
hidden=True,
)
return
roleping.bypass["roles"].append(role.id)
roleping.update()
await ctx.send(f"{role.name} role bypass added for `{rping.name}`")
@cog_ext.cog_subcommand(
base="roleping",
subcommand_group="restore",
name="user",
description="Remove a role bypass",
base_desc="Block roles from being pinged",
sub_group_desc="Remove a bypass from a roleping (restoring it)",
options=[
create_option(
name="user",
description="User to add",
option_type=6,
required=True,
),
create_option(
name="rping",
description="Rolepinged role",
option_type=8,
required=True,
),
],
)
@admin_or_permissions(manage_server=True)
async def _roleping_restore_user(
self, ctx: SlashContext, user: Member, rping: Role
):
roleping = Roleping.get(guild=ctx.guild.id, role=rping.id)
if not roleping:
await ctx.send(
f"Roleping not configured for {rping.mention}", hidden=True
)
return
if user.id not in roleping.bypass["users"]:
await ctx.send(f"{user.mention} not in bypass", hidden=True)
return
roleping.bypass["users"].delete(user.id)
roleping.update()
await ctx.send(
f"{user.nick or user.name} user bypass removed for `{rping.name}`"
)
@cog_ext.cog_subcommand(
base="roleping",
subcommand_group="restore",
name="role",
description="Remove a role bypass",
base_desc="Block roles from being pinged",
sub_group_desc="Remove a bypass from a roleping (restoring it)",
options=[
create_option(
name="role",
description="Role to add",
option_type=8,
required=True,
),
create_option(
name="rping",
description="Rolepinged role",
option_type=8,
required=True,
),
],
)
@admin_or_permissions(manage_server=True)
async def _roleping_restore_role(
self, ctx: SlashContext, role: Role, rping: Role
):
roleping = Roleping.get(guild=ctx.guild.id, role=rping.id)
if not roleping:
await ctx.send(
f"Roleping not configured for {rping.mention}", hidden=True
)
return
if role.id in roleping.bypass["roles"]:
await ctx.send(f"{role.mention} already in bypass", hidden=True)
return
if len(roleping.bypass["roles"]) == 10:
await ctx.send(
"Already have 10 roles in bypass. "
"Please consider consolidating roles for roleping bypass",
hidden=True,
)
return
roleping.bypass["roles"].append(role.id)
roleping.update()
await ctx.send(f"{role.name} role bypass added for `{rping.name}`")

View file

@ -28,6 +28,7 @@ coll_lookup = {
"Mute": "mutes",
"Purge": "purges",
"Reminder": "reminders",
"Roleping": "rolepings",
"Setting": "settings",
"Starboard": "starboard",
"Star": "stars",
@ -460,6 +461,27 @@ class Reminder(MongoObject, ActiveObject):
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Roleping(MongoObject, ActiveObject):
"""
Guild Roleping object
:param _id: MongoDB ID
:param role: Blocked role
:param guild: ID of origin guild
:param admin: Admin who added roleping
:param bypass: Roles/users who may bypass this roleping
:param active: If the roleping is disabled
:param created_at: Time the roleping was created
"""
role: int
guild: int
admin: int
bypass: dict
created_at: datetime = field(default_factory=datetime.utcnow)
@dataclass
class Setting(MongoObject):
"""