384 lines
12 KiB
Python
384 lines
12 KiB
Python
import asyncio
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
import pymongo
|
|
from discord import DMChannel, Intents, Member, Message
|
|
from discord.ext import commands
|
|
from discord.ext.tasks import loop
|
|
from discord.utils import find
|
|
from discord_slash import SlashCommand
|
|
from psutil import Process
|
|
|
|
from jarvis import logo, utils
|
|
from jarvis.config import get_config
|
|
from jarvis.db import DBManager
|
|
from jarvis.db.types import Autoreact, Ban, Lock, Mute, Setting, Warning
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.field import Field
|
|
|
|
if asyncio.get_event_loop().is_closed():
|
|
asyncio.set_event_loop(asyncio.new_event_loop())
|
|
|
|
intents = Intents.default()
|
|
intents.members = True
|
|
restart_ctx = None
|
|
|
|
invites = re.compile(
|
|
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
|
|
flags=re.IGNORECASE,
|
|
)
|
|
|
|
|
|
jarvis = commands.Bot(
|
|
command_prefix=utils.get_prefix, intents=intents, help_command=None
|
|
)
|
|
slash = SlashCommand(jarvis, sync_commands=True, sync_on_cog_reload=True)
|
|
jarvis_self = Process()
|
|
__version__ = "1.2.0"
|
|
|
|
|
|
db = DBManager(get_config().mongo).mongo
|
|
jarvis_db = db.jarvis
|
|
|
|
|
|
@jarvis.event
|
|
async def on_ready():
|
|
global restart_ctx
|
|
print(" Logged in as {0.user}".format(jarvis))
|
|
print(" Connected to {} guild(s)".format(len(jarvis.guilds)))
|
|
with jarvis_self.oneshot():
|
|
print(f" Current PID: {jarvis_self.pid}")
|
|
Path(f"jarvis.{jarvis_self.pid}.pid").touch()
|
|
if restart_ctx:
|
|
channel = None
|
|
if "guild" in restart_ctx:
|
|
guild = find(lambda x: x.id == restart_ctx["guild"], jarvis.guilds)
|
|
if guild:
|
|
channel = find(
|
|
lambda x: x.id == restart_ctx["channel"], guild.channels
|
|
)
|
|
elif "user" in restart_ctx:
|
|
channel = jarvis.get_user(restart_ctx["user"])
|
|
if channel:
|
|
await channel.send("Core systems restarted and back online.")
|
|
restart_ctx = None
|
|
|
|
|
|
@jarvis.event
|
|
async def on_member_join(user: Member):
|
|
guild = user.guild
|
|
mutes = Mute.get_active(guild=guild.id)
|
|
if mutes and len(mutes) >= 1:
|
|
mute_role = Setting.get(guild=guild.id, setting="mute")
|
|
role = guild.get_role(mute_role.value)
|
|
await user.add_roles(
|
|
role, reason="User is muted still muted from prior mute"
|
|
)
|
|
unverified = Setting.get(guild=guild.id, setting="unverified")
|
|
if unverified:
|
|
role = guild.get_role(unverified.value)
|
|
await user.add_roles(role, reason="User just joined and is unverified")
|
|
|
|
|
|
@jarvis.event
|
|
async def on_message(message: Message):
|
|
if (
|
|
not isinstance(message.channel, DMChannel)
|
|
and message.author.id != jarvis.user.id
|
|
):
|
|
autoreact = Autoreact.get(
|
|
guild=message.guild.id,
|
|
channel=message.channel.id,
|
|
)
|
|
if autoreact:
|
|
for reaction in autoreact.reactions:
|
|
await message.add_reaction(reaction)
|
|
massmention = Setting.get(
|
|
guild=message.guild.id,
|
|
setting="massmention",
|
|
)
|
|
if (
|
|
massmention.value > 0
|
|
and len(message.mentions)
|
|
- (1 if message.author in message.mentions else 0)
|
|
> massmention.value
|
|
):
|
|
warning = Warning(
|
|
active=True,
|
|
admin=get_config().client_id,
|
|
duration=24,
|
|
guild=message.guild.id,
|
|
reason="Mass Mention",
|
|
user=message.author.id,
|
|
)
|
|
warning.insert()
|
|
fields = [Field("Reason", "Mass Mention", False)]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{message.author.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.nick
|
|
if message.author.nick
|
|
else message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{message.author.name}#{message.author.discriminator} "
|
|
+ f"| {message.author.id}"
|
|
)
|
|
await message.channel.send(embed=embed)
|
|
roleping = Setting.get(guild=message.guild.id, setting="roleping")
|
|
roles = []
|
|
for mention in message.role_mentions:
|
|
roles.append(mention.id)
|
|
for mention in message.mentions:
|
|
for role in mention.roles:
|
|
roles.append(role.id)
|
|
if (
|
|
roleping
|
|
and any(x in roleping.value for x in roles)
|
|
and not any(x.id in roleping.value for x in message.author.roles)
|
|
):
|
|
warning = Warning(
|
|
active=True,
|
|
admin=get_config().client_id,
|
|
duration=24,
|
|
guild=message.guild.id,
|
|
reason="Pinged a blocked role/user with a blocked role",
|
|
user=message.author.id,
|
|
)
|
|
warning.insert()
|
|
fields = [
|
|
Field(
|
|
"Reason",
|
|
"Pinged a blocked role/user with a blocked role",
|
|
False,
|
|
)
|
|
]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{message.author.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.nick
|
|
if message.author.nick
|
|
else message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{message.author.name}#{message.author.discriminator} "
|
|
+ f"| {message.author.id}"
|
|
)
|
|
await message.channel.send(embed=embed)
|
|
autopurge = Setting.get(guild=message.guild.id, setting="autopurge")
|
|
if autopurge:
|
|
await message.delete(delay=autopurge.delay)
|
|
content = re.sub(r"\s+", "", message.content)
|
|
match = invites.search(content)
|
|
if match:
|
|
guild_invites = await message.guild.invites()
|
|
allowed = [x.code for x in guild_invites] + [
|
|
"dbrand",
|
|
"VtgZntXcnZ",
|
|
]
|
|
if match.group(1) not in allowed:
|
|
await message.delete()
|
|
warning = Warning(
|
|
active=True,
|
|
admin=get_config().client_id,
|
|
duration=24,
|
|
guild=message.guild.id,
|
|
reason="Sent an invite link",
|
|
user=message.author.id,
|
|
)
|
|
warning.insert()
|
|
fields = [
|
|
Field(
|
|
"Reason",
|
|
"Sent an invite link",
|
|
False,
|
|
)
|
|
]
|
|
embed = build_embed(
|
|
title="Warning",
|
|
description=f"{message.author.mention} has been warned",
|
|
fields=fields,
|
|
)
|
|
embed.set_author(
|
|
name=message.author.nick
|
|
if message.author.nick
|
|
else message.author.name,
|
|
icon_url=message.author.avatar_url,
|
|
)
|
|
embed.set_footer(
|
|
text=f"{message.author.name}#"
|
|
+ f"{message.author.discriminator} "
|
|
+ f"| {message.author.id}"
|
|
)
|
|
await message.channel.send(embed=embed)
|
|
await jarvis.process_commands(message)
|
|
|
|
|
|
@jarvis.event
|
|
async def on_guild_join(guild):
|
|
general = find(lambda x: x.name == "general", guild.channels)
|
|
if general and general.permissions_for(guild.me).send_messages:
|
|
await general.send(
|
|
"Allow me to introduce myself. I am J.A.R.V.I.S., a virtual "
|
|
+ "artificial intelligence, and I'm here to assist you with a "
|
|
+ "variety of tasks as best I can, "
|
|
+ "24 hours a day, seven days a week."
|
|
)
|
|
await asyncio.sleep(1)
|
|
await general.send("Importing all preferences from home interface...")
|
|
|
|
# Set some default settings
|
|
setting = Setting(guild=guild.id, setting="massmention", value=5)
|
|
setting.insert()
|
|
|
|
await general.send("Systems are now fully operational")
|
|
|
|
|
|
@loop(minutes=1)
|
|
async def unmute():
|
|
mutes = Mute.get_active(duration={"$gt": 0})
|
|
mute_roles = Setting.get(setting="mute")
|
|
updates = []
|
|
for mute in mutes:
|
|
if (
|
|
mute.created_at + timedelta(minutes=mute.duration)
|
|
< datetime.utcnow()
|
|
):
|
|
mute_role = [x.value for x in mute_roles if x.guild == mute.guild][
|
|
0
|
|
]
|
|
guild = await jarvis.fetch_guild(mute.guild)
|
|
role = guild.get_role(mute_role)
|
|
user = await guild.fetch_member(mute.user)
|
|
if user:
|
|
if role in user.roles:
|
|
await user.remove_roles(role, reason="Mute expired")
|
|
|
|
# Objects can't handle bulk_write, so handle it via raw methods
|
|
updates.append(
|
|
pymongo.UpdateOne(
|
|
{
|
|
"user": user.id,
|
|
"guild": guild.id,
|
|
"created_at": mute.created_at,
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
)
|
|
if updates:
|
|
jarvis_db.mutes.bulk_write(updates)
|
|
|
|
|
|
@loop(minutes=10)
|
|
async def unban():
|
|
bans = Ban.get_active(type="temp")
|
|
updates = []
|
|
for ban in bans:
|
|
if ban.created_at + timedelta(
|
|
hours=ban.duration
|
|
) < datetime.utcnow() + timedelta(minutes=10):
|
|
guild = await jarvis.fetch_guild(ban.guild)
|
|
user = await jarvis.fetch_user(ban.user)
|
|
if user:
|
|
guild.unban(user)
|
|
updates.append(
|
|
pymongo.UpdateOne(
|
|
{
|
|
"user": user.id,
|
|
"guild": guild.id,
|
|
"created_at": ban.created_at,
|
|
"type": "temp",
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
)
|
|
if updates:
|
|
jarvis_db.bans.bulk_write(updates)
|
|
|
|
|
|
@loop(minutes=1)
|
|
async def unlock():
|
|
locks = Lock.get_active()
|
|
updates = []
|
|
for lock in locks:
|
|
if (
|
|
lock.created_at + timedelta(minutes=lock.duration)
|
|
< datetime.utcnow()
|
|
):
|
|
guild = await jarvis.fetch_guild(lock.guild)
|
|
channel = await jarvis.fetch_channel(lock.channel)
|
|
if channel:
|
|
roles = await guild.fetch_roles()
|
|
for role in roles:
|
|
overrides = channel.overwrites_for(role)
|
|
overrides.send_messages = None
|
|
await channel.set_permissions(
|
|
role, overwrite=overrides, reason="Lock expired"
|
|
)
|
|
updates.append(
|
|
pymongo.UpdateOne(
|
|
{
|
|
"channel": channel.id,
|
|
"guild": guild.id,
|
|
"created_at": lock.created_at,
|
|
},
|
|
{"$set": {"active": False}},
|
|
)
|
|
)
|
|
if updates:
|
|
jarvis_db.locks.bulk_write(updates)
|
|
|
|
|
|
@loop(hours=1)
|
|
async def unwarn():
|
|
warns = Warning.get_active()
|
|
updates = []
|
|
for warn in warns:
|
|
if (
|
|
warn.created_at + timedelta(hours=warn.duration)
|
|
< datetime.utcnow()
|
|
):
|
|
updates.append(
|
|
pymongo.UpdateOne(
|
|
{"_id": warn._id}, {"$set": {"active": False}}
|
|
)
|
|
)
|
|
if updates:
|
|
jarvis_db.warns.bulk_write(updates)
|
|
|
|
|
|
def run(ctx=None):
|
|
global restart_ctx
|
|
if ctx:
|
|
restart_ctx = ctx
|
|
for extension in utils.get_extensions():
|
|
jarvis.load_extension(extension)
|
|
config = get_config()
|
|
print(
|
|
" https://discord.com/api/oauth2/authorize?client_id="
|
|
+ "{}&permissions=8&scope=bot%20applications.commands".format(
|
|
config.client_id
|
|
)
|
|
)
|
|
unmute.start()
|
|
unban.start()
|
|
unlock.start()
|
|
unwarn.start()
|
|
jarvis.max_messages = config.max_messages
|
|
jarvis.run(config.token, bot=True, reconnect=True)
|
|
for cog in jarvis.cogs:
|
|
session = getattr(cog, "_session", None)
|
|
if session:
|
|
session.close()
|
|
if restart_ctx:
|
|
return restart_ctx
|