Add regex filtering

This commit is contained in:
Zeva Rose 2022-10-02 01:20:02 -06:00
parent 61a3cdbcd1
commit d343ea6086
4 changed files with 208 additions and 11 deletions

View file

@ -4,7 +4,15 @@ from datetime import datetime, timedelta, timezone
from aiohttp import ClientSession
from jarvis_core.db import q
from jarvis_core.db.models import Autopurge, Autoreact, Mute, Roleping, Setting, Warning
from jarvis_core.db.models import (
Autopurge,
Autoreact,
Filter,
Mute,
Roleping,
Setting,
Warning,
)
from jarvis_core.filters import invites, url
from naff import listen
from naff.api.events.discord import MessageCreate, MessageDelete, MessageUpdate
@ -79,10 +87,6 @@ class MessageEventMixin:
]
if (m := match.group(1)) not in allowed and setting.value:
self.logger.debug(f"Removing non-allowed invite `{m}` from {message.guild.id}")
try:
await message.delete()
except Exception:
self.logger.debug("Message deleted before action taken")
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
await Warning(
@ -100,10 +104,47 @@ class MessageEventMixin:
tracker.inc()
embed = warning_embed(message.author, "Sent an invite link")
try:
await message.channel.send(embeds=embed)
await message.reply(embeds=embed)
except Exception:
self.logger.warn("Failed to send warning embed")
try:
await message.delete()
except Exception:
self.logger.debug("Message deleted before action taken")
async def filters(self, message: Message) -> None:
"""Handle filter evennts."""
filters = await Filter.find(q(guild=message.guild.id)).to_list(None)
for item in filters:
for f in item.filters:
if re.search(f, message.content, re.IGNORECASE):
expires_at = datetime.now(tz=timezone.utc) + timedelta(hours=24)
await Warning(
active=True,
admin=self.user.id,
duration=24,
expires_at=expires_at,
guild=message.guild.id,
reason="Sent a message with a filtered word",
user=message.author.id,
).commit()
tracker = warnings_tracker.labels(
guild_id=message.guild.id, guild_name=message.guild.name
)
tracker.inc()
embed = warning_embed(message.author, "Sent a message with a filtered word")
try:
await message.reply(embeds=embed)
except Exception:
self.logger.warn("Failed to send warning embed")
try:
await message.delete()
except Exception:
self.logger.debug("Message deleted before action taken")
return
async def massmention(self, message: Message) -> None:
"""Handle massmention events."""
massmention = await Setting.find_one(
@ -139,7 +180,7 @@ class MessageEventMixin:
tracker.inc()
embed = warning_embed(message.author, "Mass Mention")
try:
await message.channel.send(embeds=embed)
await message.reply(embeds=embed)
except Exception:
self.logger.warn("Failed to send warning embed")
@ -207,7 +248,7 @@ class MessageEventMixin:
tracker.inc()
embed = warning_embed(message.author, "Pinged a blocked role/user with a blocked role")
try:
await message.channel.send(embeds=embed)
await message.reply(embeds=embed)
except Exception:
self.logger.warn("Failed to send warning embed")
@ -234,7 +275,7 @@ class MessageEventMixin:
tracker.inc()
embed = warning_embed(message.author, "Phishing URL")
try:
await message.channel.send(embeds=embed)
await message.reply(embeds=embed)
except Exception:
self.logger.warn("Failed to send warning embed")
try:
@ -284,7 +325,7 @@ class MessageEventMixin:
reasons = ", ".join(f"{m['source']}: {m['type']}" for m in data["matches"])
embed = warning_embed(message.author, reasons)
try:
await message.channel.send(embeds=embed)
await message.reply(embeds=embed)
except Exception:
self.logger.warn("Failed to send warning embed")
try:
@ -343,6 +384,7 @@ class MessageEventMixin:
malicious = await self.malicious_url(message)
if phish or malicious:
await self.timeout_user(message.author, message.channel)
await self.filters(message)
@listen()
async def on_message_edit(self, event: MessageUpdate) -> None:
@ -400,6 +442,7 @@ class MessageEventMixin:
malicious = await self.malicious_url(after)
if phish or malicious:
await self.timeout_user(after.author, after.channel)
await self.filters(after)
@listen()
async def on_message_delete(self, event: MessageDelete) -> None:

View file

@ -5,6 +5,7 @@ from naff import Client
from jarvis.cogs.admin import (
ban,
filters,
kick,
lock,
lockdown,
@ -38,3 +39,5 @@ def setup(bot: Client) -> None:
logger.debug(msg.format("roleping"))
warning.WarningCog(bot)
logger.debug(msg.format("warning"))
filters.FilterCog(bot)
logger.debug(msg.format("filters"))

View file

@ -0,0 +1,151 @@
"""Filters cog."""
import asyncio
import difflib
from typing import Dict, List
from jarvis_core.db import q
from jarvis_core.db.models import Filter
from naff import AutocompleteContext, Client, Extension, InteractionContext, Permissions
from naff.models.discord.modal import InputText, Modal, TextStyles
from naff.models.naff.application_commands import (
OptionTypes,
SlashCommand,
slash_option,
)
from naff.models.naff.command import check
from thefuzz import process
from jarvis.utils.permissions import admin_or_permissions
class FilterCog(Extension):
"""JARVIS Filter cog."""
def __init__(self, bot: Client):
self.bot = bot
self.cache: Dict[int, List[str]] = {}
async def _edit_filter(self, ctx: InteractionContext, name: str, search: bool = False) -> None:
content = ""
f: Filter = None
if search:
if f := await Filter.find_one(q(name=name, guild=ctx.guild.id)):
content = "\n".join(f.filters)
modal = Modal(
title=f"Creating filter `{name}`",
components=[
InputText(
label="Filter (one statement per line)",
placeholder="" if content else "i.e. $bad_word^",
custom_id="filters",
max_length=3000,
value=content,
style=TextStyles.PARAGRAPH,
)
],
)
await ctx.send_modal(modal)
try:
data = await self.bot.wait_for_modal(modal, author=ctx.author.id, timeout=60 * 5)
filters = data.responses.get("filters").split("\n")
except asyncio.TimeoutError:
return
try:
if not f:
f = Filter(name=name, guild=ctx.guild.id, filters=filters)
else:
f.filters = filters
await f.commit()
except Exception as e:
await data.send(f"{e}", ephemeral=True)
return
content = content.splitlines()
diff = "\n".join(difflib.ndiff(content, filters))
await data.send(f"Filter `{name}` has been updated:\n\n```diff\n{diff}\n```")
if ctx.guild.id not in self.cache:
self.cache[ctx.guild.id] = []
if name not in self.cache[ctx.guild.id]:
self.cache[ctx.guild.id].append(name)
filter_ = SlashCommand(name="filter", description="Manage keyword filters")
@filter_.subcommand(sub_cmd_name="create", sub_cmd_description="Create a new filter")
@slash_option(
name="name", description="Name of new filter", required=True, opt_type=OptionTypes.STRING
)
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
async def _filter_create(self, ctx: InteractionContext, name: str) -> None:
return await self._edit_filter(ctx, name)
@filter_.subcommand(sub_cmd_name="edit", sub_cmd_description="Edit a filter")
@slash_option(
name="name",
description="Filter to edit",
autocomplete=True,
opt_type=OptionTypes.STRING,
required=True,
)
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
async def _filter_edit(self, ctx: InteractionContext, name: str) -> None:
return await self._edit_filter(ctx, name, True)
@filter_.subcommand(sub_cmd_name="view", sub_cmd_description="View a filter")
@slash_option(
name="name",
description="Filter to view",
autocomplete=True,
opt_type=OptionTypes.STRING,
required=True,
)
async def _filter_view(self, ctx: InteractionContext, name: str) -> None:
f = await Filter.find_one(q(name=name, guild=ctx.guild.id))
if not f:
await ctx.send("That filter doesn't exist", ephemeral=True)
return
filters = "\n".join(f.filters)
await ctx.send(f"Filter `{name}`:\n\n```\n{filters}\n```")
@filter_.subcommand(sub_cmd_name="delete", sub_cmd_description="Delete a filter")
@slash_option(
name="name",
description="Filter to delete",
autocomplete=True,
opt_type=OptionTypes.STRING,
required=True,
)
@check(admin_or_permissions(Permissions.MANAGE_MESSAGES))
async def _filter_delete(self, ctx: InteractionContext, name: str) -> None:
f = await Filter.find_one(q(name=name, guild=ctx.guild.id))
if not f:
await ctx.send("That filter doesn't exist", ephemeral=True)
return
try:
await f.delete()
except Exception:
self.bot.logger.debug(f"Failed to delete filter {name} in {ctx.guild.id}")
await ctx.send(f"Filter `{name}` deleted")
self.cache[ctx.guild.id].remove(f.name)
@_filter_edit.autocomplete("name")
@_filter_view.autocomplete("name")
@_filter_delete.autocomplete("name")
async def _autocomplete(self, ctx: AutocompleteContext, name: str) -> None:
if not self.cache.get(ctx.guild.id):
filters = await Filter.find(q(guild=ctx.guild.id)).to_list(None)
self.cache[ctx.guild.id] = [f.name for f in filters]
results = process.extract(name, self.cache.get(ctx.guild.id), limit=25)
choices = [{"name": r[0], "value": r[0]} for r in results]
await ctx.send(choices=choices)
def setup(bot: Client) -> None:
"""Add FilterCog to JARVIS"""
FilterCog(bot)

View file

@ -30,7 +30,7 @@ tag_name = re.compile(r"^[\w\ \-]{1,40}$")
class TagCog(Extension):
def __init__(self, bot: Client):
self.bot = bot
self.cache: Dict[int, List[int]] = {}
self.cache: Dict[int, List[str]] = {}
tag = SlashCommand(name="tag", description="Create and manage custom tags")