jarvis-bot/jarvis/__init__.py

394 lines
13 KiB
Python

import asyncio
import re
from datetime import datetime, timedelta
from pathlib import Path
import pymongo
from discord import DMChannel, Intents, Member, Message
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find
from discord_slash import SlashCommand
from psutil import Process
from jarvis import logo, utils
from jarvis.config import get_config
from jarvis.db import DBManager
from jarvis.db.types import (
Autopurge,
Autoreact,
Ban,
Lock,
Mute,
Setting,
Warning,
)
from jarvis.utils import build_embed
from jarvis.utils.field import Field
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
intents = Intents.default()
intents.members = True
restart_ctx = None
invites = re.compile(
r"(?:https?://)?(?:www.)?(?:discord.(?:gg|io|me|li)|discord(?:app)?.com/invite)/([^\s/]+?)(?=\b)",
flags=re.IGNORECASE,
)
jarvis = commands.Bot(
command_prefix=utils.get_prefix, intents=intents, help_command=None
)
slash = SlashCommand(jarvis, sync_commands=True, sync_on_cog_reload=True)
jarvis_self = Process()
__version__ = "1.5.6"
db = DBManager(get_config().mongo).mongo
jarvis_db = db.jarvis
@jarvis.event
async def on_ready():
global restart_ctx
print(" Logged in as {0.user}".format(jarvis))
print(" Connected to {} guild(s)".format(len(jarvis.guilds)))
with jarvis_self.oneshot():
print(f" Current PID: {jarvis_self.pid}")
Path(f"jarvis.{jarvis_self.pid}.pid").touch()
if restart_ctx:
channel = None
if "guild" in restart_ctx:
guild = find(lambda x: x.id == restart_ctx["guild"], jarvis.guilds)
if guild:
channel = find(
lambda x: x.id == restart_ctx["channel"], guild.channels
)
elif "user" in restart_ctx:
channel = jarvis.get_user(restart_ctx["user"])
if channel:
await channel.send("Core systems restarted and back online.")
restart_ctx = None
@jarvis.event
async def on_member_join(user: Member):
guild = user.guild
mutes = Mute.get_active(guild=guild.id)
if mutes and len(mutes) >= 1:
mute_role = Setting.get(guild=guild.id, setting="mute")
role = guild.get_role(mute_role.value)
await user.add_roles(
role, reason="User is muted still muted from prior mute"
)
unverified = Setting.get(guild=guild.id, setting="unverified")
if unverified:
role = guild.get_role(unverified.value)
await user.add_roles(role, reason="User just joined and is unverified")
@jarvis.event
async def on_message(message: Message):
if (
not isinstance(message.channel, DMChannel)
and message.author.id != jarvis.user.id
):
autoreact = Autoreact.get(
guild=message.guild.id,
channel=message.channel.id,
)
if autoreact:
for reaction in autoreact.reactions:
await message.add_reaction(reaction)
massmention = Setting.get(
guild=message.guild.id,
setting="massmention",
)
if (
massmention.value > 0
and len(message.mentions)
- (1 if message.author in message.mentions else 0)
> massmention.value
):
warning = Warning(
active=True,
admin=get_config().client_id,
duration=24,
guild=message.guild.id,
reason="Mass Mention",
user=message.author.id,
)
warning.insert()
fields = [Field("Reason", "Mass Mention", False)]
embed = build_embed(
title="Warning",
description=f"{message.author.mention} has been warned",
fields=fields,
)
embed.set_author(
name=message.author.nick
if message.author.nick
else message.author.name,
icon_url=message.author.avatar_url,
)
embed.set_footer(
text=f"{message.author.name}#{message.author.discriminator} "
+ f"| {message.author.id}"
)
await message.channel.send(embed=embed)
roleping = Setting.get(guild=message.guild.id, setting="roleping")
roles = []
for mention in message.role_mentions:
roles.append(mention.id)
for mention in message.mentions:
for role in mention.roles:
roles.append(role.id)
if (
roleping
and any(x in roleping.value for x in roles)
and not any(x.id in roleping.value for x in message.author.roles)
):
warning = Warning(
active=True,
admin=get_config().client_id,
duration=24,
guild=message.guild.id,
reason="Pinged a blocked role/user with a blocked role",
user=message.author.id,
)
warning.insert()
fields = [
Field(
"Reason",
"Pinged a blocked role/user with a blocked role",
False,
)
]
embed = build_embed(
title="Warning",
description=f"{message.author.mention} has been warned",
fields=fields,
)
embed.set_author(
name=message.author.nick
if message.author.nick
else message.author.name,
icon_url=message.author.avatar_url,
)
embed.set_footer(
text=f"{message.author.name}#{message.author.discriminator} "
+ f"| {message.author.id}"
)
await message.channel.send(embed=embed)
autopurge = Autopurge.get(
guild=message.guild.id, channel=message.channel.id
)
if autopurge:
await message.delete(delay=autopurge.delay)
content = re.sub(r"\s+", "", message.content)
match = invites.search(content)
if match:
guild_invites = await message.guild.invites()
allowed = [x.code for x in guild_invites] + [
"dbrand",
"VtgZntXcnZ",
]
if match.group(1) not in allowed:
await message.delete()
warning = Warning(
active=True,
admin=get_config().client_id,
duration=24,
guild=message.guild.id,
reason="Sent an invite link",
user=message.author.id,
)
warning.insert()
fields = [
Field(
"Reason",
"Sent an invite link",
False,
)
]
embed = build_embed(
title="Warning",
description=f"{message.author.mention} has been warned",
fields=fields,
)
embed.set_author(
name=message.author.nick
if message.author.nick
else message.author.name,
icon_url=message.author.avatar_url,
)
embed.set_footer(
text=f"{message.author.name}#"
+ f"{message.author.discriminator} "
+ f"| {message.author.id}"
)
await message.channel.send(embed=embed)
await jarvis.process_commands(message)
@jarvis.event
async def on_guild_join(guild):
general = find(lambda x: x.name == "general", guild.channels)
if general and general.permissions_for(guild.me).send_messages:
await general.send(
"Allow me to introduce myself. I am J.A.R.V.I.S., a virtual "
+ "artificial intelligence, and I'm here to assist you with a "
+ "variety of tasks as best I can, "
+ "24 hours a day, seven days a week."
)
await asyncio.sleep(1)
await general.send("Importing all preferences from home interface...")
# Set some default settings
setting = Setting(guild=guild.id, setting="massmention", value=5)
setting.insert()
await general.send("Systems are now fully operational")
@loop(minutes=1)
async def unmute():
mutes = Mute.get_active(duration={"$gt": 0})
mute_roles = Setting.get_many(setting="mute")
updates = []
for mute in mutes:
if (
mute.created_at + timedelta(minutes=mute.duration)
< datetime.utcnow()
):
mute_role = [x.value for x in mute_roles if x.guild == mute.guild][
0
]
guild = await jarvis.fetch_guild(mute.guild)
role = guild.get_role(mute_role)
user = await guild.fetch_member(mute.user)
if user:
if role in user.roles:
await user.remove_roles(role, reason="Mute expired")
# Objects can't handle bulk_write, so handle it via raw methods
updates.append(
pymongo.UpdateOne(
{
"user": user.id,
"guild": guild.id,
"created_at": mute.created_at,
},
{"$set": {"active": False}},
)
)
if updates:
jarvis_db.mutes.bulk_write(updates)
@loop(minutes=10)
async def unban():
bans = Ban.get_active(type="temp")
updates = []
for ban in bans:
if ban.created_at + timedelta(
hours=ban.duration
) < datetime.utcnow() + timedelta(minutes=10):
guild = await jarvis.fetch_guild(ban.guild)
user = await jarvis.fetch_user(ban.user)
if user:
guild.unban(user)
updates.append(
pymongo.UpdateOne(
{
"user": user.id,
"guild": guild.id,
"created_at": ban.created_at,
"type": "temp",
},
{"$set": {"active": False}},
)
)
if updates:
jarvis_db.bans.bulk_write(updates)
@loop(minutes=1)
async def unlock():
locks = Lock.get_active()
updates = []
for lock in locks:
if (
lock.created_at + timedelta(minutes=lock.duration)
< datetime.utcnow()
):
guild = await jarvis.fetch_guild(lock.guild)
channel = await jarvis.fetch_channel(lock.channel)
if channel:
roles = await guild.fetch_roles()
for role in roles:
overrides = channel.overwrites_for(role)
overrides.send_messages = None
await channel.set_permissions(
role, overwrite=overrides, reason="Lock expired"
)
updates.append(
pymongo.UpdateOne(
{
"channel": channel.id,
"guild": guild.id,
"created_at": lock.created_at,
},
{"$set": {"active": False}},
)
)
if updates:
jarvis_db.locks.bulk_write(updates)
@loop(hours=1)
async def unwarn():
warns = Warning.get_active()
updates = []
for warn in warns:
if (
warn.created_at + timedelta(hours=warn.duration)
< datetime.utcnow()
):
updates.append(
pymongo.UpdateOne(
{"_id": warn._id}, {"$set": {"active": False}}
)
)
if updates:
jarvis_db.warns.bulk_write(updates)
def run(ctx=None):
global restart_ctx
if ctx:
restart_ctx = ctx
for extension in utils.get_extensions():
jarvis.load_extension(extension)
config = get_config()
print(
" https://discord.com/api/oauth2/authorize?client_id="
+ "{}&permissions=8&scope=bot%20applications.commands".format(
config.client_id
)
)
unmute.start()
unban.start()
unlock.start()
unwarn.start()
jarvis.max_messages = config.max_messages
jarvis.run(config.token, bot=True, reconnect=True)
for cog in jarvis.cogs:
session = getattr(cog, "_session", None)
if session:
session.close()
if restart_ctx:
return restart_ctx