jarvis-tasks/jarvis_tasks/tasks/reddit.py
2023-05-10 18:06:44 -06:00

449 lines
15 KiB
Python

"""JARVIS Reddit sync."""
import asyncio
import logging
import re
from datetime import datetime, timezone
from typing import List, Optional
from asyncpraw import Reddit
from asyncpraw.models.reddit.redditor import Redditor as Ruser
from asyncpraw.models.reddit.submission import Submission
from asyncpraw.models.reddit.submission import Subreddit as Sub
from asyncprawcore.exceptions import Forbidden, NotFound
from beanie.operators import NotIn
from jarvis_core.db.models import Subreddit, SubredditFollow
# from jarvis_core.db.models import Redditor, RedditorFollow, Subreddit, SubredditFollow
from interactions import Client
from interactions.client.errors import NotFound as DNotFound
from interactions.models.discord.embed import Embed, EmbedField
from jarvis_tasks import const
from jarvis_tasks.config import load_config
from jarvis_tasks.prometheus.stats import reddit_count, reddit_gauge
from jarvis_tasks.util import build_embed
DEFAULT_USER_AGENT = f"python:JARVIS-Tasks:{const.__version__} (by u/zevaryx)"
config = load_config()
config.reddit.user_agent = config.reddit.dict().get("user_agent", DEFAULT_USER_AGENT)
running = []
logger = logging.getLogger(__name__)
image_link = re.compile(r"https?://(?:www)?\.?preview\.redd\.it\/(.*\..*)\?.*")
async def post_embeds(
sub: Sub, post: Submission, reddit: Reddit
) -> Optional[List[Embed]]:
"""
Build a post embeds.
Args:
post: Post to build embeds
"""
url = "https://reddit.com" + post.permalink
await post.author.load()
author_url = f"https://reddit.com/u/{post.author.name}"
author_icon = post.author.icon_img
images = []
title = post.title
if len(title) > 256:
title = title[253] + "..."
fields = []
content = ""
og_post = None
if "crosspost_parent_list" in vars(post):
og_post = post # noqa: F841
post = await reddit.submission(post.crosspost_parent_list[0]["id"])
await post.load()
fields.append(
EmbedField(name="Crossposted From", value=post.subreddit_name_prefixed)
)
content = f"> **{post.title}**"
if "url" in vars(post):
if any(post.url.endswith(x) for x in ["jpeg", "jpg", "png", "gif"]):
images = [post.url]
if "media_metadata" in vars(post):
for k, v in post.media_metadata.items():
if v["status"] != "valid" or v["m"] not in [
"image/jpg",
"image/png",
"image/gif",
]:
continue
ext = v["m"].split("/")[-1]
i_url = f"https://i.redd.it/{k}.{ext}"
images.append(i_url)
if len(images) == 4:
break
if "selftext" in vars(post) and post.selftext:
text = post.selftext
if post.spoiler:
text = "||" + text + "||"
content += "\n\n" + post.selftext
if len(content) > 900:
content = content[:900] + "..."
if post.spoiler:
content += "||"
content += f"\n\n[View this post]({url})"
content = "\n".join(
image_link.sub(r"https://i.redd.it/\1", x) for x in content.split("\n")
)
if not images and not content:
logger.debug(f"Post {post.id} had neither content nor images?")
return None
color = "#FF4500"
if "primary_color" in vars(sub):
color = sub.primary_color
base_embed = build_embed(
title=title,
description=content,
fields=fields,
timestamp=post.created_utc,
url=url,
color=color,
)
base_embed.set_author(
name="u/" + post.author.name, url=author_url, icon_url=author_icon
)
base_embed.set_footer(
text="Reddit",
icon_url="https://www.redditinc.com/assets/images/site/reddit-logo.png",
)
embeds = [base_embed]
if len(images) > 0:
embeds[0].set_image(url=images[0])
for image in images[1:4]:
embed = Embed(url=url)
embed.set_image(url=image)
embeds.append(embed)
return embeds
# async def _stream_user(sub: Ruser, bot: Client, reddit: Reddit) -> None:
# """
# Stream a redditor
# Args:
# sub: Redditor to stream
# bot: Client instance
# """
# now = datetime.now(tz=timezone.utc)
# await sub.load()
# running.append(sub.name)
# logger.debug(f"Streaming user {sub.name}")
# try:
# async for post in sub.stream.submissions():
# if not post:
# logger.debug(f"Got None for post from {sub.name}")
# continue
# await post.subreddit.load()
# if post.created_utc < now.timestamp():
# continue
# logger.debug(
# f"Got new post from {sub.name} in r/{post.subreddit.display_name}"
# )
# follows = RedditorFollow.find(RedditorFollow.name == sub.name)
# num_follows = 0
# async for follow in follows:
# num_follows += 1
# guild = await bot.fetch_guild(follow.guild)
# if not guild:
# logger.warning(
# f"Follow {follow.id}'s guild no longer exists, deleting"
# )
# await follow.delete()
# num_follows -= 1
# continue
# channel = await bot.fetch_channel(follow.channel)
# if not channel:
# logger.warning(
# f"Follow {follow.id}'s channel no longer exists, deleting"
# )
# await follow.delete()
# num_follows -= 1
# continue
# embeds = await post_embeds(post.subreddit, post, reddit)
# timestamp = int(post.created_utc)
# try:
# await channel.send(
# f"`u/{sub.name}` posted to r/{post.subreddit.display_name} at <t:{timestamp}:f>",
# embeds=embeds,
# )
# count = reddit_count.labels(
# guild_id=guild.id,
# guild_name=guild.name,
# subreddit_name=post.subreddit.display_name,
# redditor_name=sub.name,
# )
# count.inc()
# except DNotFound:
# logger.warning(
# f"Follow {follow.id}'s channel no longer exists, deleting"
# )
# await follow.delete()
# num_follows -= 1
# continue
# except Exception:
# logger.error(
# f"Failed to send message to {channel.id} in {channel.guild.name}",
# exc_info=True,
# )
# gauge = reddit_gauge.labels(redditor_name=sub.name)
# gauge.set(num_follows)
# if num_follows == 0:
# s = await Redditor.find_one(Redditor.name == sub.name)
# if s:
# await s.delete()
# break
# except Exception:
# logger.error(f"Redditor stream {sub.name} failed", exc_info=True)
# running.remove(sub.name)
async def _stream_subreddit(sub: Sub, bot: Client, reddit: Reddit) -> None:
"""
Stream a subreddit
Args:
sub: Subreddit to stream
bot: Client instance
"""
now = datetime.now(tz=timezone.utc)
await sub.load()
running.append(sub.display_name)
logger.debug(f"Streaming subreddit {sub.display_name}")
try:
async for post in sub.stream.submissions():
if not post:
logger.debug(f"Got None for post in {sub.display_name}")
continue
if post.created_utc < now.timestamp():
continue
logger.debug(f"Got new post in {sub.display_name}")
follows = SubredditFollow.find(
SubredditFollow.display_name == sub.display_name
)
num_follows = 0
async for follow in follows:
num_follows += 1
guild = await bot.fetch_guild(follow.guild)
if not guild:
logger.warning(
f"Follow {follow.id}'s guild no longer exists, deleting"
)
await follow.delete()
num_follows -= 1
continue
channel = await bot.fetch_channel(follow.channel)
if not channel:
logger.warning(
f"Follow {follow.id}'s channel no longer exists, deleting"
)
await follow.delete()
num_follows -= 1
continue
embeds = await post_embeds(sub, post, reddit)
timestamp = int(post.created_utc)
try:
await channel.send(
f"`r/{sub.display_name}` was posted to at <t:{timestamp}:f>",
embeds=embeds,
)
count = reddit_count.labels(
guild_id=guild.id,
guild_name=guild.name,
subreddit_name=sub.display_name,
redditor_name=post.author.name,
)
count.inc()
except DNotFound:
logger.warning(
f"Follow {follow.id}'s channel no longer exists, deleting"
)
await follow.delete()
num_follows -= 1
continue
except Exception:
logger.error(
f"Failed to send message to {channel.id} in {channel.guild.name}",
exc_info=True,
)
gauge = reddit_gauge.labels(
subreddit_name=sub.display_name, redditor_name=post.author.name
)
gauge.set(num_follows)
if num_follows == 0:
s = await Subreddit.find_one(Subreddit.display_name == sub.display_name)
if s:
await s.delete()
break
except Exception:
logger.error(f"Subreddit stream {sub.display_name} failed", exc_info=True)
running.remove(sub.display_name)
async def _stream(sub: Sub | Ruser, bot: Client, reddit: Reddit) -> None:
"""
Stream handler.
Decides what type of stream to launch based on `isinstance(sub, Sub)`
"""
if isinstance(sub, Sub):
await _stream_subreddit(sub, bot, reddit)
# else:
# await _stream_user(sub, bot, reddit)
async def reddit(bot: Client) -> None:
"""
Sync Reddit posts in the background.
Args:
bot: Client instance
"""
if not config.reddit:
logger.warn("Missing Reddit config, not starting")
return
logger.debug("Starting Task-reddit")
red = Reddit(**config.reddit)
logger.debug("Validating subreddit follows")
async for sub in Subreddit.find():
count = 0
async for follow in SubredditFollow.find(
SubredditFollow.display_name == sub.display_name
):
count += 1
guild = await bot.fetch_guild(follow.guild)
channel = await bot.fetch_channel(follow.channel)
if not guild or not channel:
logger.debug(f"Follow {follow.id} invalid, deleting")
await follow.delete()
count -= 1
continue
if count == 0:
logger.debug(f"Subreddit {sub.display_name} has no followers, removing")
await sub.delete()
# logger.debug("Validating redditor follows")
# async for sub in Redditor.find():
# count = 0
# async for follow in RedditorFollow.find(RedditorFollow.name == sub.name):
# count += 1
# guild = await bot.fetch_guild(follow.guild)
# channel = await bot.fetch_channel(follow.channel)
# if not guild or not channel:
# logger.debug(f"Follow {follow.id} invalid, deleting")
# await follow.delete()
# count -= 1
# continue
# if count == 0:
# logger.debug(f"Redditor {sub.name} has no followers, removing")
# await sub.delete()
old_count = 0
while True:
count = len(running)
subs = Subreddit.find(NotIn(Subreddit.display_name, running))
# users = Redditor.find(NotIn(Redditor.name, running))
# Go through all actively followed subreddits
async for sub in subs:
logger.debug(f"Creating stream for {sub.display_name}")
if sub.display_name in running:
logger.debug(f"Follow {sub.display_name} was found despite filter")
continue
is_followed = await SubredditFollow.find_one(
SubredditFollow.display_name == sub.display_name
)
if not is_followed:
logger.warn(f"Subreddit {sub.display_name} has no followers, removing")
await sub.delete()
continue
# Get subreddit
try:
sub = await red.subreddit(sub.display_name)
except (NotFound, Forbidden) as e:
# Subreddit is either quarantined, deleted, or private
logger.warn(
f"Subreddit {sub.display_name} raised {e.__class__.__name__}, removing"
)
try:
await sub.delete()
except Exception:
logger.debug("Ignoring deletion error")
continue
# Create and run stream
coro = _stream(sub, bot, red)
asyncio.create_task(coro)
count += 1
# Go through all actively followed redditors
# async for sub in users:
# logger.debug(f"Creating stream for {sub.name}")
# if sub.name in running:
# logger.debug(f"Follow {sub.name} was found despite filter")
# continue
# is_followed = await SubredditFollow.find_one(
# SubredditFollow.name == sub.name
# )
# if not is_followed:
# logger.warn(f"Redditor {sub.name} has no followers, removing")
# await sub.delete()
# continue
# # Get subreddit
# try:
# sub = await red.user(sub.name)
# except (NotFound, Forbidden) as e:
# # Subreddit is either quarantined, deleted, or private
# logger.warn(
# f"Redditor {sub.display_name} raised {e.__class__.__name__}, removing"
# )
# try:
# await sub.delete()
# except Exception:
# logger.debug("Ignoring deletion error")
# continue
# # Create and run stream
# coro = _stream(sub, bot, red)
# asyncio.create_task(coro)
# count += 1
if old_count != count:
logger.debug(f"Now streaming {count} subreddits")
old_count = count
# Check every 60 seconds
await asyncio.sleep(60)