Migrate Twitter to tweepy AsyncStream for faster, more reliable updates

This commit is contained in:
Zeva Rose 2022-04-20 13:35:57 -06:00
parent 6a0c6ca8d5
commit 5ced32ca2b
4 changed files with 107 additions and 70 deletions

View file

@ -5,6 +5,8 @@ exclude =
extend-ignore =
Q0, E501, C812, E203, W503, # These default to arguing with Black. We might configure some of them eventually
ANN001, # Ignore self and cls annotations
ANN002, ANN003, # Ignore *args and **kwargs
ANN101, # Ignore self
ANN204, ANN206, # return annotations for special methods and class methods
D105, D107, # Missing Docstrings in magic method and __init__
S311, # Standard pseudo-random generators are not suitable for security/cryptographic purposes.

View file

@ -1,14 +1,15 @@
"""JARVIS Twitter sync."""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import List
import tweepy
import tweepy.asynchronous
from dis_snek import Snake
from dis_snek.models.discord.embed import Embed
from jarvis_core.db import q
from jarvis_core.db.models import TwitterAccount, TwitterFollow
from tweepy.asynchronous import AsyncStream
from tweepy.models import Status
from jarvis_tasks.config import TaskConfig
from jarvis_tasks.util import build_embed
@ -17,14 +18,13 @@ config = TaskConfig.from_yaml()
logger = logging.getLogger(__name__)
def tweet_embeds(tweet: tweepy.models.Status) -> List[Embed]:
def tweet_embeds(tweet: Status) -> List[Embed]:
"""
Build a tweet embeds.
Args:
tweet: Tweet to build embeds
"""
logger.debug("Starting Task-twitter")
url = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}"
entities = tweet.__dict__.get("extended_entities", {})
media = entities.get("media", [])
@ -74,6 +74,78 @@ def tweet_embeds(tweet: tweepy.models.Status) -> List[Embed]:
return embeds
class JARVISTwitterStream(AsyncStream):
"""JARVIS Twitter AsyncStream client."""
def __init__(self, bot: Snake, *args, **kwargs):
if not bot:
raise ValueError("Missing bot")
super().__init__(*args, **kwargs)
self.bot = bot
self.current_filter = None
async def on_keep_alive(self) -> None:
"""Override keep-alive to track new accounts."""
await super().on_keep_alive()
ids = [x.twitter_id async for x in TwitterAccount.find()]
if ids != self.current_filter:
logger.debug("Follows have changed, disconnected")
self.disconnect()
async def on_status(self, status: Status) -> None:
"""
Process new statuses.
Args:
status: The status to process
"""
if status.author.id not in self.current_filter:
return
logger.debug(f"{status.author.screen_name} sent new tweet")
follows = TwitterFollow.find(q(twitter_id=status.author.id))
num_follows = 0
async for follow in follows:
num_follows += 1
guild = await self.bot.fetch_guild(follow.guild)
if not guild:
logger.warn(f"Follow {follow.id} invalid, deleting")
await follow.delete()
num_follows -= 1
continue
channel = await guild.fetch_channel(follow.channel)
if not channel:
logger.warn(f"Follow {follow.id} invalid, deleting")
await follow.delete()
num_follows -= 1
continue
retweet = "retweeted_status" in status.__dict__
if retweet and not follow.retweets:
continue
embeds = tweet_embeds(status)
mod = "re" if retweet else ""
timestamp = int(status.created_at.timestamp())
try:
await channel.send(
f"`@{status.user.screen_name}` {mod}tweeted this at <t:{timestamp}:f>",
embeds=embeds,
)
except Exception:
logger.debug(f"Failed to send message to {channel.id} in {channel.guild.name}")
if num_follows == 0:
logger.warning(f"Account {status.author.screen_name} no longer has followers, removing")
account = await TwitterAccount.find_one(q(twitter_id=status.author.id))
if account:
await account.delete()
self.disconnect()
async def twitter(bot: Snake) -> None:
"""
Sync tweets in the background.
@ -84,79 +156,41 @@ async def twitter(bot: Snake) -> None:
if not config.twitter:
logger.warn("Missing Twitter config, not starting")
return
logger.debug("Starting Task-twitter")
stream = JARVISTwitterStream(bot=bot, **config.twitter)
auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"])
api = tweepy.API(auth)
logger.debug("Starting Task-twitter")
while True:
accounts = TwitterAccount.find()
accounts_to_delete = []
# Go through all actively followed accounts
ids = []
async for account in accounts:
logger.debug(f"Checking account {account.handle}")
# Check if account needs updated (handle changes)
if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc):
logger.debug(f"Account {account.handle} out of sync, updating")
try:
user = api.get_user(user_id=account.twitter_id)
except Exception:
logger.warn(f"Failed to get {account.handle}, deleting")
await account.delete()
continue
if not user:
logger.warn(f"Failed to get {account.handle}, deleting")
await account.delete()
continue
account.handle = user.screen_name
account.last_sync = datetime.now(tz=timezone.utc)
await account.commit()
ids.append(account.twitter_id)
# Get new tweets
if tweets := api.user_timeline(user_id=account.twitter_id, since_id=account.last_tweet):
logger.debug(f"{account.handle} has new tweets")
tweets = sorted(tweets, key=lambda x: x.id)
follows = TwitterFollow.find(q(twitter_id=account.twitter_id))
follows_to_delete = []
num_follows = 0
# Go through follows and send tweet if necessary
async for follow in follows:
num_follows += 1
guild = await bot.fetch_guild(follow.guild)
if not guild:
logger.warning(f"Follow {follow.id}'s guild no longer exists, deleting")
follows_to_delete.append(follow)
continue
channel = await bot.fetch_channel(follow.channel)
if not channel:
logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting")
follows_to_delete.append(follow)
continue
for tweet in tweets:
retweet = "retweeted_status" in tweet.__dict__
if retweet and not follow.retweets:
continue
embeds = tweet_embeds(tweet)
mod = "re" if retweet else ""
timestamp = int(tweet.created_at.timestamp())
logger.debug(f"Starting stream with {len(ids)} accounts")
stream.current_filter = ids
try:
await channel.send(
f"`@{account.handle}` {mod}tweeted this at <t:{timestamp}:f>",
embeds=embeds,
)
await stream.filter(follow=ids)
except Exception:
logger.debug(
f"Failed to send message to {channel.id} in {channel.guild.name}"
)
# Delete invalid follows
for follow in follows_to_delete:
await follow.delete()
if num_follows == 0:
accounts_to_delete.append(account)
else:
newest = tweets[-1]
account.update(q(last_tweet=newest.id))
await account.commit()
# Delete invalid accounts (no follows)
for account in accounts_to_delete:
logger.info(f"{account.handle} has no followers, removing")
await account.delete()
# Only check once a minute
await asyncio.sleep(60)
logger.error("Encountered error with stream", stack_info=True)
logger.debug("Stream disconnected, updating filters and re-starting")

7
poetry.lock generated
View file

@ -223,7 +223,7 @@ python-versions = "*"
[[package]]
name = "jarvis-core"
version = "0.8.0"
version = "0.8.1"
description = "JARVIS core"
category = "main"
optional = false
@ -242,7 +242,7 @@ umongo = "^3.1.0"
type = "git"
url = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git"
reference = "main"
resolved_reference = "02b60a29ebbab91d9ad4cf876eb10c2aea6f2231"
resolved_reference = "257eaaea951d7f8ed5f6e1c4d01abed6d47e9f0d"
[[package]]
name = "marshmallow"
@ -451,6 +451,7 @@ optional = false
python-versions = ">=3.7"
[package.dependencies]
aiohttp = {version = ">=3.7.3,<4", optional = true, markers = "extra == \"async\""}
oauthlib = ">=3.2.0,<4"
requests = ">=2.27.0,<3"
requests-oauthlib = ">=1.2.0,<2"
@ -530,7 +531,7 @@ multidict = ">=4.0"
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "a07ca72dcff5520aefe9b577b0fcaf6804e488d91cdf4779b81e37ccc5ba082b"
content-hash = "ca53ee1d35badd4d592b79ddc8280a04e74281bc5a21f2537d741a3b5650fad1"
[metadata.files]
aiofiles = [

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "jarvis-tasks"
version = "0.5.0"
version = "0.6.0"
description = ""
authors = ["Your Name <you@example.com>"]
@ -9,7 +9,7 @@ python = "^3.10"
jarvis-core = {git = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git", rev = "main"}
dis-snek = "*"
aiohttp = "^3.8.1"
tweepy = "^4.5.0"
tweepy = {extras = ["async"], version = "^4.8.0"}
asyncpraw = "^7.5.0"
[tool.poetry.dev-dependencies]