jarvis-bot/jarvis/cogs/unique/dbrand.py

393 lines
16 KiB
Python

"""
JARVIS dbrand cog.
This cog is now maintenance-only due to conflict with the dbrand moderators.
Please do not file feature requests related to this cog; they will be closed.
"""
import logging
import re
from datetime import datetime, timedelta, timezone
import aiohttp
from bs4 import BeautifulSoup
from interactions import Client, Extension, InteractionContext
from interactions.client.utils import find
from interactions.models.discord.embed import EmbedField
from interactions.models.internal.application_commands import (
OptionType,
SlashCommand,
slash_option,
)
from interactions.models.internal.command import cooldown
from interactions.models.internal.cooldowns import Buckets
from thefuzz import process
from jarvis.branding import CUSTOM_EMOJIS
from jarvis.config import load_config
from jarvis.data.dbrand import shipping_lookup
from jarvis.utils import build_embed
guild_ids = [578757004059738142, 520021794380447745, 862402786116763668]
async def parse_db_status() -> dict:
"""Parse the dbrand status page for a local API"""
async with aiohttp.ClientSession() as session:
async with session.get("https://dbrand.com/status") as response:
response.raise_for_status()
soup = BeautifulSoup(await response.content.read(), features="html.parser")
tables = soup.find_all("table")
data = {}
for table in tables:
data_key = "countries"
rows = table.find_all("tr")
headers = rows.pop(0)
headers = [h.get_text() for h in headers.find_all("th")]
if headers[0] == "Service":
data_key = "operations"
data[data_key] = []
for row in rows:
row_data = []
cells = row.find_all("td")
for cell in cells:
if "column--comment" in cell["class"]:
text = cell.find("span").get_text()
if cell != "Unavailable":
text += ": " + cell.find("div").get_text()
cell = text.strip()
elif "column--status" in cell["class"]:
info = cell.find("span")["class"]
if any("green" in x for x in info):
cell = CUSTOM_EMOJIS.get("ico_clock_green", "🟢")
elif any("yellow" in x for x in info):
cell = CUSTOM_EMOJIS.get("ico_clock_yellow", "🟡")
elif any("red" in x for x in info):
cell = CUSTOM_EMOJIS.get("ico_clock_red", "🔴")
elif any("black" in x for x in info):
cell = ""
else:
cell = cell.get_text().strip()
row_data.append(cell)
data[data_key].append(
{headers[idx]: value for idx, value in enumerate(row_data)}
)
return data
class DbrandCog(Extension):
"""
dbrand functions for JARVIS
Mostly support functions. Credit @cpixl for the shipping API
"""
def __init__(self, bot: Client):
self.bot = bot
self.logger = logging.getLogger(__name__)
self.base_url = "https://dbrand.com/"
self._session = aiohttp.ClientSession()
self._session.headers.update({"Content-Type": "application/json"})
self.api_url = load_config().urls["dbrand_shipping"]
self.cache = {}
def __del__(self):
self._session.close()
db = SlashCommand(name="db", description="dbrand commands", scopes=guild_ids)
@db.subcommand(
sub_cmd_name="status", sub_cmd_description="Get dbrand operational status"
)
async def _status(self, ctx: InteractionContext) -> None:
status = self.cache.get("status")
if not status or status["cache_expiry"] <= datetime.now(tz=timezone.utc):
status = await parse_db_status()
status["cache_expiry"] = datetime.now(tz=timezone.utc) + timedelta(hours=2)
self.cache["status"] = status
status = status.get("operations")
emojies = [x["Status"] for x in status]
fields = [
EmbedField(name=f'{x["Status"]} {x["Service"]}', value=x["Detail"])
for x in status
]
color = "#FBBD1E"
if all("green" in x for x in emojies):
color = "#38F657"
elif all("red" in x for x in emojies):
color = "#F12D20"
embed = build_embed(
title="Operational Status",
description="Current dbrand operational status.\n[View online](https://dbrand.com/status)",
fields=fields,
url="https://dbrand.com/status",
color=color,
)
embed.set_thumbnail(url="https://dev.zevaryx.com/db_logo.png")
embed.set_footer(
text="dbrand.com",
icon_url="https://dev.zevaryx.com/db_logo.png",
)
await ctx.send(embeds=embed)
async def _db_support_cmd(self, ctx: InteractionContext) -> None:
status = self.cache.get("status")
if not status or status["cache_expiry"] <= datetime.now(tz=timezone.utc):
status = await parse_db_status()
status["cache_expiry"] = datetime.now(tz=timezone.utc) + timedelta(hours=2)
self.cache["status"] = status
status = status.get("operations")
emojies = [x["Status"] for x in status]
fields = [
EmbedField(name=f'{x["Status"]} {x["Service"]}', value=x["Detail"])
for x in status
if x["Service"] == "Email Support"
]
color = "#FBBD1E"
if all("green" in x for x in emojies):
color = "#38F657"
elif all("red" in x for x in emojies):
color = "#F12D20"
embed = build_embed(
title="Contact Support",
description="",
fields=fields,
url="https://dbrand.com/contact",
color=color,
)
embed.set_thumbnail(url="https://dev.zevaryx.com/db_logo.png")
embed.set_footer(
text="dbrand.com",
icon_url="https://dev.zevaryx.com/db_logo.png",
)
await ctx.send(embeds=embed)
@db.subcommand(
sub_cmd_name="contact",
sub_cmd_description="Contact support",
)
@cooldown(bucket=Buckets.USER, rate=1, interval=30)
async def _contact(self, ctx: InteractionContext) -> None:
return await self._db_support_cmd(ctx)
@db.subcommand(
sub_cmd_name="support",
sub_cmd_description="Contact support",
)
@cooldown(bucket=Buckets.USER, rate=1, interval=30)
async def _support(self, ctx: InteractionContext) -> None:
return await self._db_support_cmd(ctx)
# @db.subcommand(sub_cmd_name="gripcheck", sub_cmd_description="Watch a dbrand grip get thrown")
async def _gripcheck(self, ctx: InteractionContext) -> None:
video_url = "https://cdn.discordapp.com/attachments/599068193339736096/890679742263623751/video0.mov"
image_url = "https://cdn.discordapp.com/attachments/599068193339736096/890680198306095104/image0.jpg"
await ctx.send(f"Video: {video_url}\nResults: {image_url}")
@db.subcommand(sub_cmd_name="info", sub_cmd_description="Get useful links")
@cooldown(bucket=Buckets.USER, rate=1, interval=30)
async def _info(self, ctx: InteractionContext) -> None:
urls = [
f"[Get Skins]({self.base_url + 'skins'})",
f"[Robot Camo]({self.base_url + 'robot-camo'})",
f"[Get a Grip]({self.base_url + 'grip'})",
f"[Shop All Products]({self.base_url + 'shop'})",
f"[Order Status]({self.base_url + 'order-status'})",
f"[dbrand Status]({self.base_url + 'status'})",
f"[Be (not) extorted]({self.base_url + 'not-extortion'})",
"[Robot Camo Wallpapers](https://db.io/wallpapers)",
]
embed = build_embed(
title="Useful Links",
description="\n\n".join(urls),
fields=[],
color="#FFBB00",
)
embed.set_footer(
text="dbrand.com",
icon_url="https://dev.zevaryx.com/db_logo.png",
)
embed.set_thumbnail(url="https://dev.zevaryx.com/db_logo.png")
embed.set_author(
name="dbrand",
url=self.base_url,
icon_url="https://dev.zevaryx.com/db_logo.png",
)
await ctx.send(embeds=embed)
@db.subcommand(
sub_cmd_name="ship",
sub_cmd_description="Get shipping information for your country",
)
@slash_option(
name="search",
description="Country search query (2 character code, country name, flag emoji)",
opt_type=OptionType.STRING,
required=True,
)
@cooldown(bucket=Buckets.USER, rate=1, interval=2)
async def _shipping(self, ctx: InteractionContext, search: str) -> None:
if not re.match(r"^[A-Z- ]+$", search, re.IGNORECASE):
if re.match(
r"^[\U0001f1e6-\U0001f1ff]{2}$",
search,
re.IGNORECASE,
):
# Magic number, subtract from flag char to get ascii char
uni2ascii = 127365
search = chr(ord(search[0]) - uni2ascii) + chr(
ord(search[1]) - uni2ascii
)
elif search == "🏳️":
search = "fr"
else:
await ctx.send(
"Please use text to search for shipping.", ephemeral=True
)
return
if len(search) > 3:
countries = {x["country"]: x["alpha-2"] for x in shipping_lookup}
match = process.extractOne(search, countries.keys())
if match:
search = countries[match[0]]
else:
await ctx.send(f"Unable to find country {search}", ephemeral=True)
return
elif len(search) == 3:
alpha3 = {x["alpha-3"]: x["alpha-2"] for x in shipping_lookup}
if search in alpha3:
search = alpha3[search]
else:
match = process.extractOne(search, alpha3.keys())
search = alpha3[match[0]]
await ctx.defer()
dest = search.lower()
data = self.cache.get(dest, None)
if not data or data["cache_expiry"] < datetime.now(tz=timezone.utc):
api_link = self.api_url + dest
data = await self._session.get(api_link)
if 200 <= data.status < 400:
data = await data.json()
data["cache_expiry"] = datetime.now(tz=timezone.utc) + timedelta(
hours=24
)
self.cache[dest] = data
else:
data = None
fields = None
if data is not None and data["is_valid"] and data["shipping_available"]:
fields = []
for service in data["shipping_services_available"]:
service_data = self.cache.get(f"{dest}-{service}")
if not service_data or service_data["cache_expiry"] < datetime.now(
tz=timezone.utc
):
service_data = await self._session.get(
self.api_url + dest + "/" + service["url"]
)
if service_data.status > 400:
continue
service_data = await service_data.json()
service_data["cache_expiry"] = datetime.now(
tz=timezone.utc
) + timedelta(hours=24)
self.cache[f"{dest}-{service}"] = service_data
title = f'{service_data["carrier"]} {service_data["tier-title"]} | {service_data["costs-min"]}'
message = service_data["time-title"]
if service_data["free_threshold_available"]:
title += " | Free over " + service_data["free-threshold"]
fields.append(EmbedField(title, message))
status = self.cache.get("status")
if not status or status["cache_expiry"] <= datetime.now(tz=timezone.utc):
status = await parse_db_status()
status["cache_expiry"] = datetime.now(tz=timezone.utc) + timedelta(
hours=2
)
self.cache["status"] = status
status = status["countries"]
country = data["country"]
if country.startswith("the"):
country = country.replace("the", "").strip()
shipping_info = find(lambda x: x["Country"] == country, status)
country = "-".join(x for x in data["country"].split(" ") if x != "the")
description = ""
color = "#FFBB00"
if shipping_info:
description = f'{shipping_info["Status"]}\u200b \u200b {shipping_info["Est. Delivery Time"].split(":")[0]}'
created = self.cache.get("status").get("cache_expiry") - timedelta(
hours=2
)
ts = int(created.timestamp())
description += f" \u200b | \u200b Last updated: <t:{ts}:R>\n\u200b"
if "green" in shipping_info["Status"]:
color = "#38F657"
elif "yellow" in shipping_info["Status"]:
color = "#FBBD1E"
elif "red" in shipping_info["Status"]:
color = "#F12D20"
else:
color = "#FFFFFF"
embed = build_embed(
title="Shipping to {}".format(data["country"]),
description=description,
color=color,
fields=fields,
url=self.base_url + "shipping/" + country,
)
embed.set_thumbnail(url=self.base_url + data["country_flag"][1:])
embed.set_footer(
text="dbrand.com",
icon_url="https://dev.zevaryx.com/db_logo.png",
)
await ctx.send(embeds=embed)
elif not data["is_valid"]:
embed = build_embed(
title="Check Shipping Times",
description=(
"Country not found.\nYou can [view all shipping "
"destinations here](https://dbrand.com/shipping)"
),
fields=[],
url="https://dbrand.com/shipping",
color="#FFBB00",
)
embed.set_thumbnail(url="https://dev.zevaryx.com/db_logo.png")
embed.set_footer(
text="dbrand.com",
icon_url="https://dev.zevaryx.com/db_logo.png",
)
await ctx.send(embeds=embed)
elif not data["shipping_available"]:
embed = build_embed(
title="Shipping to {}".format(data["country"]),
description=(
"No shipping available.\nTime to move to a country"
" that has shipping available.\nYou can [find a new country "
"to live in here](https://dbrand.com/shipping)"
),
fields=[],
url="https://dbrand.com/shipping",
color="#FFBB00",
)
embed.set_thumbnail(url=self.base_url + data["country_flag"][1:])
embed.set_footer(
text="dbrand.com",
icon_url="https://dev.zevaryx.com/db_logo.png",
)
await ctx.send(embeds=embed)
def setup(bot: Client) -> None:
"""Add dbrandcog to JARVIS"""
if load_config().urls.get("dbrand_shipping"):
DbrandCog(bot)
else:
bot.logger.info("Missing dbrand shipping URL, not loading dbrand cog")