Add redditor following, fix bugs
This commit is contained in:
parent
f189abbd50
commit
113b92c177
5 changed files with 185 additions and 35 deletions
|
@ -6,12 +6,12 @@ tasks_info = Info("jarvis_tasks", "JARVIS Task info")
|
||||||
reddit_gauge = Gauge(
|
reddit_gauge = Gauge(
|
||||||
"jarvis_tasks_reddit_follows",
|
"jarvis_tasks_reddit_follows",
|
||||||
"JARVIS Reddit follows",
|
"JARVIS Reddit follows",
|
||||||
labelnames=["subreddit_name"],
|
labelnames=["subreddit_name", "redditor_name"],
|
||||||
)
|
)
|
||||||
reddit_count = Counter(
|
reddit_count = Counter(
|
||||||
"jarvis_tasks_reddit_count",
|
"jarvis_tasks_reddit_count",
|
||||||
"JARVIS Reddit sync count",
|
"JARVIS Reddit sync count",
|
||||||
labelnames=["guild_id", "guild_name", "subreddit_name"],
|
labelnames=["guild_id", "guild_name", "subreddit_name", "redditor_name"],
|
||||||
)
|
)
|
||||||
|
|
||||||
twitter_gauge = Gauge(
|
twitter_gauge = Gauge(
|
||||||
|
|
|
@ -6,11 +6,12 @@ from datetime import datetime, timezone
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
from asyncpraw import Reddit
|
from asyncpraw import Reddit
|
||||||
|
from asyncpraw.models.reddit.redditor import Redditor as Ruser
|
||||||
from asyncpraw.models.reddit.submission import Submission
|
from asyncpraw.models.reddit.submission import Submission
|
||||||
from asyncpraw.models.reddit.submission import Subreddit as Sub
|
from asyncpraw.models.reddit.submission import Subreddit as Sub
|
||||||
from asyncprawcore.exceptions import Forbidden, NotFound
|
from asyncprawcore.exceptions import Forbidden, NotFound
|
||||||
from jarvis_core.db import q
|
from jarvis_core.db import q
|
||||||
from jarvis_core.db.models import Subreddit, SubredditFollow
|
from jarvis_core.db.models import Redditor, RedditorFollow, Subreddit, SubredditFollow
|
||||||
from naff import Client
|
from naff import Client
|
||||||
from naff.client.errors import NotFound as DNotFound
|
from naff.client.errors import NotFound as DNotFound
|
||||||
from naff.models.discord.embed import Embed, EmbedField
|
from naff.models.discord.embed import Embed, EmbedField
|
||||||
|
@ -110,7 +111,87 @@ async def post_embeds(sub: Sub, post: Submission, reddit: Reddit) -> Optional[Li
|
||||||
return embeds
|
return embeds
|
||||||
|
|
||||||
|
|
||||||
async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None:
|
async def _stream_user(sub: Ruser, bot: Client, reddit: Reddit) -> None:
|
||||||
|
"""
|
||||||
|
Stream a redditor
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sub: Redditor to stream
|
||||||
|
bot: Client instance
|
||||||
|
"""
|
||||||
|
now = datetime.now(tz=timezone.utc)
|
||||||
|
await sub.load()
|
||||||
|
running.append(sub.name)
|
||||||
|
logger.debug(f"Streaming user {sub.name}")
|
||||||
|
try:
|
||||||
|
async for post in sub.stream.submissions():
|
||||||
|
if not post:
|
||||||
|
logger.debug(f"Got None for post from {sub.name}")
|
||||||
|
continue
|
||||||
|
await post.subreddit.load()
|
||||||
|
if post.created_utc < now.timestamp():
|
||||||
|
continue
|
||||||
|
logger.debug(f"Got new post from {sub.name} in r/{post.subreddit.display_name}")
|
||||||
|
follows = RedditorFollow.find(q(name=sub.name))
|
||||||
|
num_follows = 0
|
||||||
|
|
||||||
|
async for follow in follows:
|
||||||
|
num_follows += 1
|
||||||
|
|
||||||
|
guild = await bot.fetch_guild(follow.guild)
|
||||||
|
if not guild:
|
||||||
|
logger.warning(f"Follow {follow.id}'s guild no longer exists, deleting")
|
||||||
|
await follow.delete()
|
||||||
|
num_follows -= 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
channel = await bot.fetch_channel(follow.channel)
|
||||||
|
if not channel:
|
||||||
|
logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting")
|
||||||
|
await follow.delete()
|
||||||
|
num_follows -= 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
embeds = await post_embeds(post.subreddit, post, reddit)
|
||||||
|
timestamp = int(post.created_utc)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await channel.send(
|
||||||
|
f"`u/{sub.name}` posted to r/{post.subreddit.display_name} at <t:{timestamp}:f>",
|
||||||
|
embeds=embeds,
|
||||||
|
)
|
||||||
|
count = reddit_count.labels(
|
||||||
|
guild_id=guild.id,
|
||||||
|
guild_name=guild.name,
|
||||||
|
subreddit_name=post.subreddit.display_name,
|
||||||
|
redditor_name=sub.name,
|
||||||
|
)
|
||||||
|
count.inc()
|
||||||
|
except DNotFound:
|
||||||
|
logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting")
|
||||||
|
await follow.delete()
|
||||||
|
num_follows -= 1
|
||||||
|
continue
|
||||||
|
except Exception:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to send message to {channel.id} in {channel.guild.name}",
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
gauge = reddit_gauge.labels(redditor_name=sub.name)
|
||||||
|
gauge.set(num_follows)
|
||||||
|
|
||||||
|
if num_follows == 0:
|
||||||
|
s = await Redditor.find_one(q(name=sub.name))
|
||||||
|
if s:
|
||||||
|
await s.delete()
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
logger.error(f"Redditor stream {sub.name} failed", exc_info=True)
|
||||||
|
running.remove(sub.name)
|
||||||
|
|
||||||
|
|
||||||
|
async def _stream_subreddit(sub: Sub, bot: Client, reddit: Reddit) -> None:
|
||||||
"""
|
"""
|
||||||
Stream a subreddit
|
Stream a subreddit
|
||||||
|
|
||||||
|
@ -159,7 +240,10 @@ async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None:
|
||||||
embeds=embeds,
|
embeds=embeds,
|
||||||
)
|
)
|
||||||
count = reddit_count.labels(
|
count = reddit_count.labels(
|
||||||
guild_id=guild.id, guild_name=guild.name, subreddit_name=sub.display_name
|
guild_id=guild.id,
|
||||||
|
guild_name=guild.name,
|
||||||
|
subreddit_name=sub.display_name,
|
||||||
|
redditor_name=post.author.name,
|
||||||
)
|
)
|
||||||
count.inc()
|
count.inc()
|
||||||
except DNotFound:
|
except DNotFound:
|
||||||
|
@ -173,7 +257,9 @@ async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None:
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
gauge = reddit_gauge.labels(subreddit_name=sub.display_name)
|
gauge = reddit_gauge.labels(
|
||||||
|
subreddit_name=sub.display_name, redditor_name=post.author.name
|
||||||
|
)
|
||||||
gauge.set(num_follows)
|
gauge.set(num_follows)
|
||||||
|
|
||||||
if num_follows == 0:
|
if num_follows == 0:
|
||||||
|
@ -186,6 +272,18 @@ async def _stream(sub: Sub, bot: Client, reddit: Reddit) -> None:
|
||||||
running.remove(sub.display_name)
|
running.remove(sub.display_name)
|
||||||
|
|
||||||
|
|
||||||
|
async def _stream(sub: Sub | Ruser, bot: Client, reddit: Reddit) -> None:
|
||||||
|
"""
|
||||||
|
Stream handler.
|
||||||
|
|
||||||
|
Decides what type of stream to launch based on `type(sub)`
|
||||||
|
"""
|
||||||
|
if isinstance(sub, Sub):
|
||||||
|
await _stream_subreddit(sub, bot, reddit)
|
||||||
|
else:
|
||||||
|
await _stream_user(sub, bot, reddit)
|
||||||
|
|
||||||
|
|
||||||
async def reddit(bot: Client) -> None:
|
async def reddit(bot: Client) -> None:
|
||||||
"""
|
"""
|
||||||
Sync Reddit posts in the background.
|
Sync Reddit posts in the background.
|
||||||
|
@ -199,7 +297,7 @@ async def reddit(bot: Client) -> None:
|
||||||
logger.debug("Starting Task-reddit")
|
logger.debug("Starting Task-reddit")
|
||||||
red = Reddit(**config.reddit)
|
red = Reddit(**config.reddit)
|
||||||
|
|
||||||
logger.debug("Validating follows")
|
logger.debug("Validating subreddit follows")
|
||||||
async for sub in Subreddit.find():
|
async for sub in Subreddit.find():
|
||||||
count = 0
|
count = 0
|
||||||
|
|
||||||
|
@ -218,10 +316,30 @@ async def reddit(bot: Client) -> None:
|
||||||
logger.debug(f"Subreddit {sub.display_name} has no followers, removing")
|
logger.debug(f"Subreddit {sub.display_name} has no followers, removing")
|
||||||
await sub.delete()
|
await sub.delete()
|
||||||
|
|
||||||
|
logger.debug("Validating redditor follows")
|
||||||
|
async for sub in Redditor.find():
|
||||||
|
count = 0
|
||||||
|
|
||||||
|
async for follow in RedditorFollow.find(q(name=sub.name)):
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
guild = await bot.fetch_guild(follow.guild)
|
||||||
|
channel = await bot.fetch_channel(follow.channel)
|
||||||
|
if not guild or not channel:
|
||||||
|
logger.debug(f"Follow {follow.id} invalid, deleting")
|
||||||
|
await follow.delete()
|
||||||
|
count -= 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
if count == 0:
|
||||||
|
logger.debug(f"Redditor {sub.name} has no followers, removing")
|
||||||
|
await sub.delete()
|
||||||
|
|
||||||
old_count = 0
|
old_count = 0
|
||||||
while True:
|
while True:
|
||||||
count = len(running)
|
count = len(running)
|
||||||
subs = Subreddit.find(q(display_name__nin=running))
|
subs = Subreddit.find(q(display_name__nin=running))
|
||||||
|
users = Redditor.find(q(name__nin=running))
|
||||||
|
|
||||||
# Go through all actively followed subreddits
|
# Go through all actively followed subreddits
|
||||||
async for sub in subs:
|
async for sub in subs:
|
||||||
|
@ -253,6 +371,36 @@ async def reddit(bot: Client) -> None:
|
||||||
asyncio.create_task(coro)
|
asyncio.create_task(coro)
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
|
# Go through all actively followed redditors
|
||||||
|
async for sub in users:
|
||||||
|
logger.debug(f"Creating stream for {sub.name}")
|
||||||
|
if sub.name in running:
|
||||||
|
logger.debug(f"Follow {sub.name} was found despite filter")
|
||||||
|
continue
|
||||||
|
|
||||||
|
is_followed = await SubredditFollow.find_one(q(name=sub.name))
|
||||||
|
if not is_followed:
|
||||||
|
logger.warn(f"Redditor {sub.name} has no followers, removing")
|
||||||
|
await sub.delete()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get subreddit
|
||||||
|
try:
|
||||||
|
sub = await red.user(sub.name)
|
||||||
|
except (NotFound, Forbidden) as e:
|
||||||
|
# Subreddit is either quarantined, deleted, or private
|
||||||
|
logger.warn(f"Redditor {sub.display_name} raised {e.__class__.__name__}, removing")
|
||||||
|
try:
|
||||||
|
await sub.delete()
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Ignoring deletion error")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Create and run stream
|
||||||
|
coro = _stream(sub, bot, red)
|
||||||
|
asyncio.create_task(coro)
|
||||||
|
count += 1
|
||||||
|
|
||||||
if old_count != count:
|
if old_count != count:
|
||||||
logger.debug(f"Now streaming {count} subreddits")
|
logger.debug(f"Now streaming {count} subreddits")
|
||||||
old_count = count
|
old_count = count
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
"""JARVIS Twitter sync."""
|
"""JARVIS Twitter sync."""
|
||||||
import logging
|
import logging
|
||||||
|
from asyncio import sleep
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from typing import List
|
from typing import List
|
||||||
|
|
||||||
|
@ -224,8 +225,7 @@ async def twitter(bot: Client) -> None:
|
||||||
try:
|
try:
|
||||||
user = api.get_user(user_id=account.twitter_id)
|
user = api.get_user(user_id=account.twitter_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warn(f"Failed to get {account.handle}, deleting")
|
logger.error("Encountered API error", exc_info=True)
|
||||||
await account.delete()
|
|
||||||
continue
|
continue
|
||||||
if not user:
|
if not user:
|
||||||
logger.warn(f"Failed to get {account.handle}, deleting")
|
logger.warn(f"Failed to get {account.handle}, deleting")
|
||||||
|
@ -244,3 +244,4 @@ async def twitter(bot: Client) -> None:
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.error("Encountered error with stream", stack_info=True)
|
logger.error("Encountered error with stream", stack_info=True)
|
||||||
logger.debug("Stream disconnected, updating filters and re-starting")
|
logger.debug("Stream disconnected, updating filters and re-starting")
|
||||||
|
await sleep(5)
|
||||||
|
|
51
poetry.lock
generated
51
poetry.lock
generated
|
@ -222,24 +222,6 @@ python-versions = "*"
|
||||||
[package.extras]
|
[package.extras]
|
||||||
test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"]
|
test = ["flake8 (==3.7.8)", "hypothesis (==3.55.3)"]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "dis-snek"
|
|
||||||
version = "8.0.0"
|
|
||||||
description = "An API wrapper for Discord filled with snakes"
|
|
||||||
category = "main"
|
|
||||||
optional = false
|
|
||||||
python-versions = ">=3.10"
|
|
||||||
|
|
||||||
[package.dependencies]
|
|
||||||
aiohttp = "*"
|
|
||||||
attrs = "*"
|
|
||||||
discord-typings = "*"
|
|
||||||
tomli = "*"
|
|
||||||
|
|
||||||
[package.extras]
|
|
||||||
all = ["PyNaCl (>=1.5.0,<1.6)", "yt-dlp"]
|
|
||||||
voice = ["PyNaCl (>=1.5.0,<1.6)", "yt-dlp"]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "discord-typings"
|
name = "discord-typings"
|
||||||
version = "0.4.0"
|
version = "0.4.0"
|
||||||
|
@ -301,7 +283,7 @@ python-versions = "*"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "jarvis-core"
|
name = "jarvis-core"
|
||||||
version = "0.10.2"
|
version = "0.11.0"
|
||||||
description = "JARVIS core"
|
description = "JARVIS core"
|
||||||
category = "main"
|
category = "main"
|
||||||
optional = false
|
optional = false
|
||||||
|
@ -322,7 +304,7 @@ umongo = "^3.1.0"
|
||||||
type = "git"
|
type = "git"
|
||||||
url = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git"
|
url = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git"
|
||||||
reference = "main"
|
reference = "main"
|
||||||
resolved_reference = "7bb9b25f636fbcbea97e0924f2192a1e497258dd"
|
resolved_reference = "fce3b829a30583abd48b3221825c3ed303610de8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "marshmallow"
|
name = "marshmallow"
|
||||||
|
@ -371,6 +353,25 @@ category = "dev"
|
||||||
optional = false
|
optional = false
|
||||||
python-versions = "*"
|
python-versions = "*"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "naff"
|
||||||
|
version = "1.4.0"
|
||||||
|
description = "Not another freaking fork"
|
||||||
|
category = "main"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.10"
|
||||||
|
|
||||||
|
[package.dependencies]
|
||||||
|
aiohttp = "*"
|
||||||
|
attrs = "*"
|
||||||
|
discord-typings = "*"
|
||||||
|
tomli = "*"
|
||||||
|
|
||||||
|
[package.extras]
|
||||||
|
all = ["PyNaCl (>=1.5.0,<1.6)", "cchardet", "aiodns", "orjson", "brotli"]
|
||||||
|
speedup = ["cchardet", "aiodns", "orjson", "brotli"]
|
||||||
|
voice = ["PyNaCl (>=1.5.0,<1.6)"]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "nanoid"
|
name = "nanoid"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
|
@ -757,7 +758,7 @@ multidict = ">=4.0"
|
||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "1.1"
|
lock-version = "1.1"
|
||||||
python-versions = "^3.10"
|
python-versions = "^3.10"
|
||||||
content-hash = "5e1dd02bf3166bc33c20313f2184351a0ff6a7c0025f5dc69f780cafd55839d9"
|
content-hash = "4a8b066e49bcdb4af24298b610e46c3f6a3fc30e683b79ee84d3ea897e081257"
|
||||||
|
|
||||||
[metadata.files]
|
[metadata.files]
|
||||||
aiofiles = [
|
aiofiles = [
|
||||||
|
@ -923,10 +924,6 @@ commonmark = [
|
||||||
{file = "commonmark-0.9.1-py2.py3-none-any.whl", hash = "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9"},
|
{file = "commonmark-0.9.1-py2.py3-none-any.whl", hash = "sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9"},
|
||||||
{file = "commonmark-0.9.1.tar.gz", hash = "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60"},
|
{file = "commonmark-0.9.1.tar.gz", hash = "sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60"},
|
||||||
]
|
]
|
||||||
dis-snek = [
|
|
||||||
{file = "dis-snek-8.0.0.tar.gz", hash = "sha256:c035a4f664f9a638b80089f2a9a3330a4254fc227ef2c83c96582df06f392281"},
|
|
||||||
{file = "dis_snek-8.0.0-py3-none-any.whl", hash = "sha256:3a89c8f78c27407fb67d42dfaa51be6a507306306779e45cd47687bd846b3b23"},
|
|
||||||
]
|
|
||||||
discord-typings = [
|
discord-typings = [
|
||||||
{file = "discord-typings-0.4.0.tar.gz", hash = "sha256:66bce666194e8f006914f788f940265c009cce9b63f9a8ce2bc7931d3d3ef11c"},
|
{file = "discord-typings-0.4.0.tar.gz", hash = "sha256:66bce666194e8f006914f788f940265c009cce9b63f9a8ce2bc7931d3d3ef11c"},
|
||||||
{file = "discord_typings-0.4.0-py3-none-any.whl", hash = "sha256:a390b614cbcbd82af083660c12e46536b4b790ac026f8d43bdd11c8953837ca0"},
|
{file = "discord_typings-0.4.0-py3-none-any.whl", hash = "sha256:a390b614cbcbd82af083660c12e46536b4b790ac026f8d43bdd11c8953837ca0"},
|
||||||
|
@ -1086,6 +1083,10 @@ mypy-extensions = [
|
||||||
{file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"},
|
{file = "mypy_extensions-0.4.3-py2.py3-none-any.whl", hash = "sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d"},
|
||||||
{file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"},
|
{file = "mypy_extensions-0.4.3.tar.gz", hash = "sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8"},
|
||||||
]
|
]
|
||||||
|
naff = [
|
||||||
|
{file = "naff-1.4.0-py3-none-any.whl", hash = "sha256:81d1e42dbc761b5ec3820b3bbf64f45c23ffdd185aed6c5512c9a8b24e0277de"},
|
||||||
|
{file = "naff-1.4.0.tar.gz", hash = "sha256:2f8bc2216c54a0b58db05aa8f787d33e2ad3db3d1e512751dc3efb16e5891653"},
|
||||||
|
]
|
||||||
nanoid = [
|
nanoid = [
|
||||||
{file = "nanoid-2.0.0-py3-none-any.whl", hash = "sha256:90aefa650e328cffb0893bbd4c236cfd44c48bc1f2d0b525ecc53c3187b653bb"},
|
{file = "nanoid-2.0.0-py3-none-any.whl", hash = "sha256:90aefa650e328cffb0893bbd4c236cfd44c48bc1f2d0b525ecc53c3187b653bb"},
|
||||||
{file = "nanoid-2.0.0.tar.gz", hash = "sha256:5a80cad5e9c6e9ae3a41fa2fb34ae189f7cb420b2a5d8f82bd9d23466e4efa68"},
|
{file = "nanoid-2.0.0.tar.gz", hash = "sha256:5a80cad5e9c6e9ae3a41fa2fb34ae189f7cb420b2a5d8f82bd9d23466e4efa68"},
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "jarvis-tasks"
|
name = "jarvis-tasks"
|
||||||
version = "0.8.0"
|
version = "0.9.0"
|
||||||
description = ""
|
description = ""
|
||||||
authors = ["Your Name <you@example.com>"]
|
authors = ["Your Name <you@example.com>"]
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue