"""JARVIS Reddit sync.""" import asyncio import logging import re from datetime import datetime, timezone from typing import List, Optional from asyncpraw import Reddit from asyncpraw.models.reddit.submission import Submission from asyncpraw.models.reddit.submission import Subreddit as Sub from asyncprawcore.exceptions import Forbidden, NotFound from jarvis_core.db import q from jarvis_core.db.models import Subreddit, SubredditFollow from naff import Client from naff.client.errors import NotFound as DNotFound from naff.models.discord.embed import Embed, EmbedField from jarvis_tasks import const from jarvis_tasks.config import TaskConfig from jarvis_tasks.prometheus.stats import reddit_count, reddit_gauge from jarvis_tasks.util import build_embed DEFAULT_USER_AGENT = f"python:JARVIS-Tasks:{const.__version__} (by u/zevaryx)" config = TaskConfig.from_yaml() config.reddit["user_agent"] = config.reddit.get("user_agent", DEFAULT_USER_AGENT) running = [] logger = logging.getLogger(__name__) image_link = re.compile(r"https?://(?:www)?\.?preview\.redd\.it\/(.*\..*)\?.*") async def post_embeds(sub: Sub, post: Submission, reddit: Reddit) -> Optional[List[Embed]]: """ Build a post embeds. Args: post: Post to build embeds """ url = "https://reddit.com" + post.permalink await post.author.load() author_url = f"https://reddit.com/u/{post.author.name}" author_icon = post.author.icon_img images = [] title = post.title if len(title) > 256: title = title[253] + "..." fields = [] content = "" og_post = None if "crosspost_parent_list" in vars(post): og_post = post # noqa: F841 post = await reddit.submission(post.crosspost_parent_list[0]["id"]) await post.load() fields.append(EmbedField(name="Crossposted From", value=post.subreddit_name_prefixed)) content = f"> **{post.title}**" if "url" in vars(post): if any(post.url.endswith(x) for x in ["jpeg", "jpg", "png", "gif"]): images = [post.url] if "media_metadata" in vars(post): for k, v in post.media_metadata.items(): if v["status"] != "valid" or v["m"] not in ["image/jpg", "image/png", "image/gif"]: continue ext = v["m"].split("/")[-1] i_url = f"https://i.redd.it/{k}.{ext}" images.append(i_url) if len(images) == 4: break if "selftext" in vars(post) and post.selftext: text = post.selftext if post.spoiler: text = "||" + text + "||" content += "\n\n" + post.selftext if len(content) > 900: content = content[:900] + "..." if post.spoiler: content += "||" content += f"\n\n[View this post]({url})" content = "\n".join(image_link.sub(r"https://i.redd.it/\1", x) for x in content.split("\n")) if not images and not content: logger.debug(f"Post {post.id} had neither content nor images?") return None color = "#FF4500" if "primary_color" in vars(sub): color = sub.primary_color base_embed = build_embed( title=title, description=content, fields=fields, timestamp=post.created_utc, url=url, color=color, ) base_embed.set_author(name="u/" + post.author.name, url=author_url, icon_url=author_icon) base_embed.set_footer( text="Reddit", icon_url="https://www.redditinc.com/assets/images/site/reddit-logo.png" ) embeds = [base_embed] if len(images) > 0: embeds[0].set_image(url=images[0]) for image in images[1:4]: embed = Embed(url=url) embed.set_image(url=image) embeds.append(embed) return embeds async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None: """ Stream a subreddit Args: sub: Subreddit to stream bot: Client instance """ now = datetime.now(tz=timezone.utc) await sub.load() running.append(sub.display_name) logger.debug(f"Streaming subreddit {sub.display_name}") try: async for post in sub.stream.submissions(): if not post: logger.debug(f"Got None for post in {sub.display_name}") continue if post.created_utc < now.timestamp(): continue logger.debug(f"Got new post in {sub.display_name}") follows = SubredditFollow.find(q(display_name=sub.display_name)) num_follows = 0 async for follow in follows: num_follows += 1 guild = await bot.fetch_guild(follow.guild) if not guild: logger.warning(f"Follow {follow.id}'s guild no longer exists, deleting") await follow.delete() num_follows -= 1 continue channel = await bot.fetch_channel(follow.channel) if not channel: logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting") await follow.delete() num_follows -= 1 continue embeds = await post_embeds(sub, post, reddit) timestamp = int(post.created_utc) try: await channel.send( f"`r/{sub.display_name}` was posted to at ", embeds=embeds, ) count = reddit_count.labels( guild_id=guild.id, guild_name=guild.name, subreddit_name=sub.display_name ) count.inc() except DNotFound: logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting") await follow.delete() num_follows -= 1 continue except Exception: logger.error( f"Failed to send message to {channel.id} in {channel.guild.name}", exc_info=True, ) gauge = reddit_gauge.labels(subreddit_name=sub.display_name) gauge.set(num_follows) if num_follows == 0: s = await Subreddit.find_one(q(display_name=sub.display_name)) if s: await s.delete() break except Exception: logger.error(f"Subreddit stream {sub.display_name} failed", exc_info=True) running.remove(sub.display_name) async def reddit(bot: Client) -> None: """ Sync Reddit posts in the background. Args: bot: Client instance """ if not config.reddit: logger.warn("Missing Reddit config, not starting") return logger.debug("Starting Task-reddit") red = Reddit(**config.reddit) logger.debug("Validating follows") async for sub in Subreddit.find(): count = 0 async for follow in SubredditFollow.find(q(display_name=sub.display_name)): count += 1 guild = await bot.fetch_guild(follow.guild) channel = await bot.fetch_channel(follow.channel) if not guild or not channel: logger.debug(f"Follow {follow.id} invalid, deleting") await follow.delete() count -= 1 continue if count == 0: logger.debug(f"Subreddit {sub.display_name} has no followers, removing") await sub.delete() old_count = 0 while True: count = len(running) subs = Subreddit.find(q(display_name__nin=running)) # Go through all actively followed subreddits async for sub in subs: logger.debug(f"Creating stream for {sub.display_name}") if sub.display_name in running: logger.debug(f"Follow {sub.display_name} was found despite filter") continue is_followed = await SubredditFollow.find_one(q(display_name=sub.display_name)) if not is_followed: logger.warn(f"Subreddit {sub.display_name} has no followers, removing") await sub.delete() continue # Get subreddit try: sub = await red.subreddit(sub.display_name) except (NotFound, Forbidden) as e: # Subreddit is either quarantined, deleted, or private logger.warn(f"Subreddit {sub.display_name} raised {e.__class__.__name__}, removing") try: await sub.delete() except Exception: logger.debug("Ignoring deletion error") continue # Create and run stream coro = _stream(sub, bot, red) asyncio.create_task(coro) count += 1 if old_count != count: logger.debug(f"Now streaming {count} subreddits") old_count = count # Check every 60 seconds await asyncio.sleep(60)