jarvis-bot/jarvis/__init__.py
2021-07-20 08:43:51 -06:00

397 lines
13 KiB
Python

import asyncio
import re
from datetime import datetime, timedelta
from pathlib import Path
import pymongo
from discord import DMChannel, Intents, Member, Message
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find, get
from discord_slash import SlashCommand
from psutil import Process
from jarvis import logo, utils
from jarvis.config import get_config
from jarvis.utils import build_embed
from jarvis.utils.db import DBManager
from jarvis.utils.field import Field
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
intents = Intents.default()
intents.members = True
restart_ctx = None
invites = re.compile(
r"(https?://)?(www.)?(discord.(gg|io|me|li)|discord(app)?.com/invite)/([^\s/]+?)(?=\b)",
flags=re.IGNORECASE,
)
jarvis = commands.Bot(
command_prefix=utils.get_prefix, intents=intents, help_command=None
)
slash = SlashCommand(jarvis, sync_commands=True, sync_on_cog_reload=True)
jarvis_self = Process()
__version__ = "1.0.1"
db = DBManager(get_config().mongo).mongo
@jarvis.event
async def on_ready():
global restart_ctx
print(" Logged in as {0.user}".format(jarvis))
print(" Connected to {} guild(s)".format(len(jarvis.guilds)))
with jarvis_self.oneshot():
print(f" Current PID: {jarvis_self.pid}")
Path(f"jarvis.{jarvis_self.pid}.pid").touch()
if restart_ctx:
channel = None
if "guild" in restart_ctx:
guild = find(lambda x: x.id == restart_ctx["guild"], jarvis.guilds)
if guild:
channel = find(
lambda x: x.id == restart_ctx["channel"], guild.channels
)
elif "user" in restart_ctx:
channel = jarvis.get_user(restart_ctx["user"])
if channel:
await channel.send("Core systems restarted and back online.")
restart_ctx = None
@jarvis.event
async def on_member_join(user: Member):
guild = user.guild
db = DBManager(get_config().mongo).mongo
mutes = list(
db.jarvis.mutes.find(
{"active": True, "user": user.id, "guild": guild.id}
)
)
if mutes and len(mutes) >= 1:
mute_role = db.jarvis.settings.find_one(
{"guild": guild.id, "setting": "mute"}
)
role = guild.get_role(mute_role["value"])
await user.add_roles(
role, reason="User is muted still muted from prior mute"
)
unverified = db.jarvis.settings.find_one(
{"guild": guild.id, "setting": "unverified"}
)
if unverified:
role = guild.get_role(unverified["value"])
await user.add_roles(role, reason="User just joined and is unverified")
@jarvis.event
async def on_message(message: Message):
if (
not isinstance(message.channel, DMChannel)
and message.author.id != jarvis.user.id
):
autoreact = db.jarvis.autoreact.find_one(
{"guild": message.guild.id, "channel": message.channel.id}
)
if autoreact:
for reaction in autoreact["reactions"]:
await message.add_reaction(reaction)
massmention = db.jarvis.settings.find_one(
{"guild": message.guild.id, "setting": "massmention"}
)
if (
massmention["value"] > 0
and len(message.mentions)
- (1 if message.author in message.mentions else 0)
> massmention["value"]
):
db.jarvis.warns.insert_one(
{
"user": message.author.id,
"reason": "Mass Mention",
"admin": get_config().client_id,
"time": datetime.now(),
"guild": message.guild.id,
"duration": 24,
"active": True,
}
)
fields = [Field("Reason", "Mass Mention", False)]
embed = build_embed(
title="Warning",
description=f"{message.author.mention} has been warned",
fields=fields,
)
embed.set_author(
name=message.author.nick
if message.author.nick
else message.author.name,
icon_url=message.author.avatar_url,
)
embed.set_footer(
text=f"{message.author.name}#{message.author.discriminator} "
+ f"| {message.author.id}"
)
await message.channel.send(embed=embed)
roleping = db.jarvis.settings.find_one(
{"guild": message.guild.id, "setting": "roleping"}
)
roles = []
for mention in message.role_mentions:
roles.append(mention.id)
for mention in message.mentions:
for role in mention.roles:
roles.append(role.id)
if (
roleping
and any(x in roleping["value"] for x in roles)
and not any(
x.id in roleping["value"] for x in message.author.roles
)
):
db.jarvis.warns.insert_one(
{
"user": message.author.id,
"reason": "Pinged a blocked role/user with a blocked role",
"admin": get_config().client_id,
"time": datetime.now(),
"guild": message.guild.id,
"duration": 24,
"active": True,
}
)
fields = [
Field(
"Reason",
"Pinged a blocked role/user with a blocked role",
False,
)
]
embed = build_embed(
title="Warning",
description=f"{message.author.mention} has been warned",
fields=fields,
)
embed.set_author(
name=message.author.nick
if message.author.nick
else message.author.name,
icon_url=message.author.avatar_url,
)
embed.set_footer(
text=f"{message.author.name}#{message.author.discriminator} "
+ f"| {message.author.id}"
)
await message.channel.send(embed=embed)
autopurge = db.jarvis.autopurge.find_one(
{"guild": message.guild.id, "channel": message.channel.id}
)
if autopurge:
await message.delete(delay=autopurge["delay"])
content = re.sub(r"\s+", "", message.content)
content = re.sub(r"[^\w\d./]+", "", content)
match = invites.search(content)
if match:
guild_invites = await message.guild.invites()
allowed = [x.code for x in guild_invites] + [
"dbrand",
"VtgZntXcnZ",
]
if match.group(6) not in allowed:
await message.delete()
db.jarvis.warns.insert_one(
{
"user": message.author.id,
"reason": "Sent an invite link",
"admin": get_config().client_id,
"time": datetime.now(),
"guild": message.guild.id,
"duration": 24,
"active": True,
}
)
fields = [
Field(
"Reason",
"Sent an invite link",
False,
)
]
embed = build_embed(
title="Warning",
description=f"{message.author.mention} has been warned",
fields=fields,
)
embed.set_author(
name=message.author.nick
if message.author.nick
else message.author.name,
icon_url=message.author.avatar_url,
)
embed.set_footer(
text=f"{message.author.name}#"
+ f"{message.author.discriminator} "
+ f"| {message.author.id}"
)
await message.channel.send(embed=embed)
await jarvis.process_commands(message)
@jarvis.event
async def on_guild_join(guild):
general = find(lambda x: x.name == "general", guild.channels)
if general and general.permissions_for(guild.me).send_messages:
await general.send(
"Allow me to introduce myself. I am J.A.R.V.I.S., a virtual "
+ "artificial intelligence, and I'm here to assist you with a "
+ "variety of tasks as best I can, "
+ "24 hours a day, seven days a week."
)
await asyncio.sleep(1)
await general.send("Importing all preferences from home interface...")
# Set some default settings
db = DBManager(get_config().mongo).mongo
db.jarvis.settings.insert_one(
{"guild": guild.id, "setting": "massmention", "value": 5}
)
await general.send("Systems are now fully operational")
@loop(minutes=1)
async def unmute():
db = DBManager(get_config().mongo).mongo
mutes = list(db.jarvis.mutes.find({"active": True, "length": {"$gt": 0}}))
mute_roles = list(
db.jarvis.settings.find({"setting": "mute"}).sort(
[("guild", pymongo.ASCENDING)]
)
)
updates = []
for mute in mutes:
if mute["time"] + timedelta(minutes=mute["length"]) < datetime.now():
mute_role = [
x["value"] for x in mute_roles if x["guild"] == mute["guild"]
][0]
guild = await jarvis.fetch_guild(mute["guild"])
role = guild.get_role(mute_role)
user = await guild.fetch_member(mute["user"])
if user:
if role in user.roles:
await user.remove_roles(role, reason="Mute expired")
updates.append(
pymongo.UpdateOne(
{"user": user.id, "guild": guild.id, "time": mute["time"]},
{"$set": {"active": False}},
)
)
if updates:
db.jarvis.mutes.bulk_write(updates)
@loop(minutes=10)
async def unban():
db = DBManager(get_config().mongo).mongo
bans = list(db.jarvis.bans.find({"active": True, "type": "temp"}))
updates = []
for ban in bans:
if ban["time"] + timedelta(
hours=ban["length"]
) < datetime.now() + timedelta(minutes=10):
guild = await jarvis.fetch_guild(ban["guild"])
user = await jarvis.fetch_user(ban["user"])
if user:
guild.unban(user)
updates.append(
pymongo.UpdateOne(
{
"user": user.id,
"guild": guild.id,
"time": ban["time"],
"type": "temp",
},
{"$set": {"active": False}},
)
)
if updates:
db.jarvis.bans.bulk_write(updates)
@loop(minutes=1)
async def unlock():
db = DBManager(get_config().mongo).mongo
locks = list(db.jarvis.locks.find({"active": True}))
updates = []
for lock in locks:
if lock["time"] + timedelta(minutes=lock["duration"]) < datetime.now():
guild = await jarvis.fetch_guild(lock["guild"])
channel = await jarvis.fetch_channel(lock["channel"])
if channel:
roles = await guild.fetch_roles()
for role in roles:
overrides = channel.overwrites_for(role)
overrides.send_messages = None
await channel.set_permissions(
role, overwrite=overrides, reason="Lock expired"
)
updates.append(
pymongo.UpdateOne(
{
"channel": channel.id,
"guild": guild.id,
"time": lock["time"],
},
{"$set": {"active": False}},
)
)
if updates:
db.jarvis.locks.bulk_write(updates)
@loop(hours=1)
async def unwarn():
db = DBManager(get_config().mongo).mongo
warns = list(db.jarvis.warns.find({"active": True}))
updates = []
for warn in warns:
if warn["time"] + timedelta(hours=warn["duration"]) < datetime.now():
updates.append(
pymongo.UpdateOne(
{"_id": warn["_id"]}, {"$set": {"active": False}}
)
)
if updates:
db.jarvis.warns.bulk_write(updates)
def run(ctx=None):
global restart_ctx
if ctx:
restart_ctx = ctx
for extension in utils.get_extensions():
jarvis.load_extension(extension)
config = get_config()
print(
" https://discord.com/api/oauth2/authorize?client_id="
+ "{}&permissions=8&scope=bot%20applications.commands".format(
config.client_id
)
)
unmute.start()
unban.start()
unlock.start()
unwarn.start()
jarvis.max_messages = config.max_messages
jarvis.run(config.token, bot=True, reconnect=True)
for cog in jarvis.cogs:
session = getattr(cog, "_session", None)
if session:
session.close()
if restart_ctx:
return restart_ctx