From 113b92c177abcb3fd994a69b36bb3bfdfe946fab Mon Sep 17 00:00:00 2001 From: zevaryx Date: Fri, 15 Jul 2022 09:20:10 -0600 Subject: [PATCH] Add redditor following, fix bugs --- jarvis_tasks/prometheus/stats.py | 4 +- jarvis_tasks/tasks/reddit.py | 158 ++++++++++++++++++++++++++++++- jarvis_tasks/tasks/twitter.py | 5 +- poetry.lock | 51 +++++----- pyproject.toml | 2 +- 5 files changed, 185 insertions(+), 35 deletions(-) diff --git a/jarvis_tasks/prometheus/stats.py b/jarvis_tasks/prometheus/stats.py index c3c27f4..5ec35f4 100644 --- a/jarvis_tasks/prometheus/stats.py +++ b/jarvis_tasks/prometheus/stats.py @@ -6,12 +6,12 @@ tasks_info = Info("jarvis_tasks", "JARVIS Task info") reddit_gauge = Gauge( "jarvis_tasks_reddit_follows", "JARVIS Reddit follows", - labelnames=["subreddit_name"], + labelnames=["subreddit_name", "redditor_name"], ) reddit_count = Counter( "jarvis_tasks_reddit_count", "JARVIS Reddit sync count", - labelnames=["guild_id", "guild_name", "subreddit_name"], + labelnames=["guild_id", "guild_name", "subreddit_name", "redditor_name"], ) twitter_gauge = Gauge( diff --git a/jarvis_tasks/tasks/reddit.py b/jarvis_tasks/tasks/reddit.py index cb89b0a..cdee044 100644 --- a/jarvis_tasks/tasks/reddit.py +++ b/jarvis_tasks/tasks/reddit.py @@ -6,11 +6,12 @@ from datetime import datetime, timezone from typing import List, Optional from asyncpraw import Reddit +from asyncpraw.models.reddit.redditor import Redditor as Ruser from asyncpraw.models.reddit.submission import Submission from asyncpraw.models.reddit.submission import Subreddit as Sub from asyncprawcore.exceptions import Forbidden, NotFound from jarvis_core.db import q -from jarvis_core.db.models import Subreddit, SubredditFollow +from jarvis_core.db.models import Redditor, RedditorFollow, Subreddit, SubredditFollow from naff import Client from naff.client.errors import NotFound as DNotFound from naff.models.discord.embed import Embed, EmbedField @@ -110,7 +111,87 @@ async def post_embeds(sub: Sub, post: Submission, reddit: Reddit) -> Optional[Li return embeds -async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None: +async def _stream_user(sub: Ruser, bot: Client, reddit: Reddit) -> None: + """ + Stream a redditor + + Args: + sub: Redditor to stream + bot: Client instance + """ + now = datetime.now(tz=timezone.utc) + await sub.load() + running.append(sub.name) + logger.debug(f"Streaming user {sub.name}") + try: + async for post in sub.stream.submissions(): + if not post: + logger.debug(f"Got None for post from {sub.name}") + continue + await post.subreddit.load() + if post.created_utc < now.timestamp(): + continue + logger.debug(f"Got new post from {sub.name} in r/{post.subreddit.display_name}") + follows = RedditorFollow.find(q(name=sub.name)) + num_follows = 0 + + async for follow in follows: + num_follows += 1 + + guild = await bot.fetch_guild(follow.guild) + if not guild: + logger.warning(f"Follow {follow.id}'s guild no longer exists, deleting") + await follow.delete() + num_follows -= 1 + continue + + channel = await bot.fetch_channel(follow.channel) + if not channel: + logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting") + await follow.delete() + num_follows -= 1 + continue + + embeds = await post_embeds(post.subreddit, post, reddit) + timestamp = int(post.created_utc) + + try: + await channel.send( + f"`u/{sub.name}` posted to r/{post.subreddit.display_name} at ", + embeds=embeds, + ) + count = reddit_count.labels( + guild_id=guild.id, + guild_name=guild.name, + subreddit_name=post.subreddit.display_name, + redditor_name=sub.name, + ) + count.inc() + except DNotFound: + logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting") + await follow.delete() + num_follows -= 1 + continue + except Exception: + logger.error( + f"Failed to send message to {channel.id} in {channel.guild.name}", + exc_info=True, + ) + + gauge = reddit_gauge.labels(redditor_name=sub.name) + gauge.set(num_follows) + + if num_follows == 0: + s = await Redditor.find_one(q(name=sub.name)) + if s: + await s.delete() + break + except Exception: + logger.error(f"Redditor stream {sub.name} failed", exc_info=True) + running.remove(sub.name) + + +async def _stream_subreddit(sub: Sub, bot: Client, reddit: Reddit) -> None: """ Stream a subreddit @@ -159,7 +240,10 @@ async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None: embeds=embeds, ) count = reddit_count.labels( - guild_id=guild.id, guild_name=guild.name, subreddit_name=sub.display_name + guild_id=guild.id, + guild_name=guild.name, + subreddit_name=sub.display_name, + redditor_name=post.author.name, ) count.inc() except DNotFound: @@ -173,7 +257,9 @@ async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None: exc_info=True, ) - gauge = reddit_gauge.labels(subreddit_name=sub.display_name) + gauge = reddit_gauge.labels( + subreddit_name=sub.display_name, redditor_name=post.author.name + ) gauge.set(num_follows) if num_follows == 0: @@ -186,6 +272,18 @@ async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None: running.remove(sub.display_name) +async def _stream(sub: Sub | Ruser, bot: Client, reddit: Reddit) -> None: + """ + Stream handler. + + Decides what type of stream to launch based on `type(sub)` + """ + if isinstance(sub, Sub): + await _stream_subreddit(sub, bot, reddit) + else: + await _stream_user(sub, bot, reddit) + + async def reddit(bot: Client) -> None: """ Sync Reddit posts in the background. @@ -199,7 +297,7 @@ async def reddit(bot: Client) -> None: logger.debug("Starting Task-reddit") red = Reddit(**config.reddit) - logger.debug("Validating follows") + logger.debug("Validating subreddit follows") async for sub in Subreddit.find(): count = 0 @@ -218,10 +316,30 @@ async def reddit(bot: Client) -> None: logger.debug(f"Subreddit {sub.display_name} has no followers, removing") await sub.delete() + logger.debug("Validating redditor follows") + async for sub in Redditor.find(): + count = 0 + + async for follow in RedditorFollow.find(q(name=sub.name)): + count += 1 + + guild = await bot.fetch_guild(follow.guild) + channel = await bot.fetch_channel(follow.channel) + if not guild or not channel: + logger.debug(f"Follow {follow.id} invalid, deleting") + await follow.delete() + count -= 1 + continue + + if count == 0: + logger.debug(f"Redditor {sub.name} has no followers, removing") + await sub.delete() + old_count = 0 while True: count = len(running) subs = Subreddit.find(q(display_name__nin=running)) + users = Redditor.find(q(name__nin=running)) # Go through all actively followed subreddits async for sub in subs: @@ -253,6 +371,36 @@ async def reddit(bot: Client) -> None: asyncio.create_task(coro) count += 1 + # Go through all actively followed redditors + async for sub in users: + logger.debug(f"Creating stream for {sub.name}") + if sub.name in running: + logger.debug(f"Follow {sub.name} was found despite filter") + continue + + is_followed = await SubredditFollow.find_one(q(name=sub.name)) + if not is_followed: + logger.warn(f"Redditor {sub.name} has no followers, removing") + await sub.delete() + continue + + # Get subreddit + try: + sub = await red.user(sub.name) + except (NotFound, Forbidden) as e: + # Subreddit is either quarantined, deleted, or private + logger.warn(f"Redditor {sub.display_name} raised {e.__class__.__name__}, removing") + try: + await sub.delete() + except Exception: + logger.debug("Ignoring deletion error") + continue + + # Create and run stream + coro = _stream(sub, bot, red) + asyncio.create_task(coro) + count += 1 + if old_count != count: logger.debug(f"Now streaming {count} subreddits") old_count = count diff --git a/jarvis_tasks/tasks/twitter.py b/jarvis_tasks/tasks/twitter.py index ea12792..5b72a47 100644 --- a/jarvis_tasks/tasks/twitter.py +++ b/jarvis_tasks/tasks/twitter.py @@ -1,5 +1,6 @@ """JARVIS Twitter sync.""" import logging +from asyncio import sleep from datetime import datetime, timedelta, timezone from typing import List @@ -224,8 +225,7 @@ async def twitter(bot: Client) -> None: try: user = api.get_user(user_id=account.twitter_id) except Exception: - logger.warn(f"Failed to get {account.handle}, deleting") - await account.delete() + logger.error("Encountered API error", exc_info=True) continue if not user: logger.warn(f"Failed to get {account.handle}, deleting") @@ -244,3 +244,4 @@ async def twitter(bot: Client) -> None: except Exception: logger.error("Encountered error with stream", stack_info=True) logger.debug("Stream disconnected, updating filters and re-starting") + await sleep(5) diff --git a/poetry.lock b/poetry.lock index 7283645..67f8639 100644 --- a/poetry.lock +++ b/poetry.lock @@ -222,24 +222,6 @@ python-versions = "*" [package.extras] test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"] -[[package]] -name = "dis-snek" -version = "8.0.0" -description = "An API wrapper for Discord filled with snakes" -category = "main" -optional = false -python-versions = ">=3.10" - -[package.dependencies] -aiohttp = "*" -attrs = "*" -discord-typings = "*" -tomli = "*" - -[package.extras] -all = ["PyNaCl (>=1.5.0,<1.6)", "yt-dlp"] -voice = ["PyNaCl (>=1.5.0,<1.6)", "yt-dlp"] - [[package]] name = "discord-typings" version = "0.4.0" @@ -301,7 +283,7 @@ python-versions = "*" [[package]] name = "jarvis-core" -version = "0.10.2" +version = "0.11.0" description = "JARVIS core" category = "main" optional = false @@ -322,7 +304,7 @@ umongo = "^3.1.0" type = "git" url = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git" reference = "main" -resolved_reference = "7bb9b25f636fbcbea97e0924f2192a1e497258dd" +resolved_reference = "fce3b829a30583abd48b3221825c3ed303610de8" [[package]] name = "marshmallow" @@ -371,6 +353,25 @@ category = "dev" optional = false python-versions = "*" +[[package]] +name = "naff" +version = "1.4.0" +description = "Not another freaking fork" +category = "main" +optional = false +python-versions = ">=3.10" + +[package.dependencies] +aiohttp = "*" +attrs = "*" +discord-typings = "*" +tomli = "*" + +[package.extras] +all = ["PyNaCl (>=1.5.0,<1.6)", "cchardet", "aiodns", "orjson", "brotli"] +speedup = ["cchardet", "aiodns", "orjson", "brotli"] +voice = ["PyNaCl (>=1.5.0,<1.6)"] + [[package]] name = "nanoid" version = "2.0.0" @@ -757,7 +758,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "5e1dd02bf3166bc33c20313f2184351a0ff6a7c0025f5dc69f780cafd55839d9" +content-hash = "4a8b066e49bcdb4af24298b610e46c3f6a3fc30e683b79ee84d3ea897e081257" [metadata.files] aiofiles = [ @@ -923,10 +924,6 @@ commonmark = [ {file = "commonmark-0.9.1-py2.py3-none-any.whl", hash = "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9"}, {file = "commonmark-0.9.1.tar.gz", hash = "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60"}, ] -dis-snek = [ - {file = "dis-snek-8.0.0.tar.gz", hash = "sha256:c035a4f664f9a638b80089f2a9a3330a4254fc227ef2c83c96582df06f392281"}, - {file = "dis_snek-8.0.0-py3-none-any.whl", hash = "sha256:3a89c8f78c27407fb67d42dfaa51be6a507306306779e45cd47687bd846b3b23"}, -] discord-typings = [ {file = "discord-typings-0.4.0.tar.gz", hash = "sha256:66bce666194e8f006914f788f940265c009cce9b63f9a8ce2bc7931d3d3ef11c"}, {file = "discord_typings-0.4.0-py3-none-any.whl", hash = "sha256:a390b614cbcbd82af083660c12e46536b4b790ac026f8d43bdd11c8953837ca0"}, @@ -1086,6 +1083,10 @@ mypy-extensions = [ {file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"}, {file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"}, ] +naff = [ + {file = "naff-1.4.0-py3-none-any.whl", hash = "sha256:81d1e42dbc761b5ec3820b3bbf64f45c23ffdd185aed6c5512c9a8b24e0277de"}, + {file = "naff-1.4.0.tar.gz", hash = "sha256:2f8bc2216c54a0b58db05aa8f787d33e2ad3db3d1e512751dc3efb16e5891653"}, +] nanoid = [ {file = "nanoid-2.0.0-py3-none-any.whl", hash = "sha256:90aefa650e328cffb0893bbd4c236cfd44c48bc1f2d0b525ecc53c3187b653bb"}, {file = "nanoid-2.0.0.tar.gz", hash = "sha256:5a80cad5e9c6e9ae3a41fa2fb34ae189f7cb420b2a5d8f82bd9d23466e4efa68"}, diff --git a/pyproject.toml b/pyproject.toml index 6d21c41..8e1a94b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "jarvis-tasks" -version = "0.8.0" +version = "0.9.0" description = "" authors = ["Your Name "]