diff --git a/.flake8 b/.flake8 index 4f1c9e4..4cc3c85 100644 --- a/.flake8 +++ b/.flake8 @@ -5,6 +5,8 @@ exclude = extend-ignore = Q0, E501, C812, E203, W503, # These default to arguing with Black. We might configure some of them eventually ANN001, # Ignore self and cls annotations + ANN002, ANN003, # Ignore *args and **kwargs + ANN101, # Ignore self ANN204, ANN206, # return annotations for special methods and class methods D105, D107, # Missing Docstrings in magic method and __init__ S311, # Standard pseudo-random generators are not suitable for security/cryptographic purposes. diff --git a/jarvis_tasks/tasks/twitter.py b/jarvis_tasks/tasks/twitter.py index fa70e8d..5fd3682 100644 --- a/jarvis_tasks/tasks/twitter.py +++ b/jarvis_tasks/tasks/twitter.py @@ -1,14 +1,15 @@ """JARVIS Twitter sync.""" -import asyncio import logging from datetime import datetime, timedelta, timezone from typing import List -import tweepy +import tweepy.asynchronous from dis_snek import Snake from dis_snek.models.discord.embed import Embed from jarvis_core.db import q from jarvis_core.db.models import TwitterAccount, TwitterFollow +from tweepy.asynchronous import AsyncStream +from tweepy.models import Status from jarvis_tasks.config import TaskConfig from jarvis_tasks.util import build_embed @@ -17,14 +18,13 @@ config = TaskConfig.from_yaml() logger = logging.getLogger(__name__) -def tweet_embeds(tweet: tweepy.models.Status) -> List[Embed]: +def tweet_embeds(tweet: Status) -> List[Embed]: """ Build a tweet embeds. Args: tweet: Tweet to build embeds """ - logger.debug("Starting Task-twitter") url = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}" entities = tweet.__dict__.get("extended_entities", {}) media = entities.get("media", []) @@ -74,6 +74,78 @@ def tweet_embeds(tweet: tweepy.models.Status) -> List[Embed]: return embeds +class JARVISTwitterStream(AsyncStream): + """JARVIS Twitter AsyncStream client.""" + + def __init__(self, bot: Snake, *args, **kwargs): + if not bot: + raise ValueError("Missing bot") + super().__init__(*args, **kwargs) + self.bot = bot + self.current_filter = None + + async def on_keep_alive(self) -> None: + """Override keep-alive to track new accounts.""" + await super().on_keep_alive() + ids = [x.twitter_id async for x in TwitterAccount.find()] + if ids != self.current_filter: + logger.debug("Follows have changed, disconnected") + self.disconnect() + + async def on_status(self, status: Status) -> None: + """ + Process new statuses. + + Args: + status: The status to process + """ + if status.author.id not in self.current_filter: + return + logger.debug(f"{status.author.screen_name} sent new tweet") + follows = TwitterFollow.find(q(twitter_id=status.author.id)) + num_follows = 0 + + async for follow in follows: + num_follows += 1 + + guild = await self.bot.fetch_guild(follow.guild) + if not guild: + logger.warn(f"Follow {follow.id} invalid, deleting") + await follow.delete() + num_follows -= 1 + continue + + channel = await guild.fetch_channel(follow.channel) + if not channel: + logger.warn(f"Follow {follow.id} invalid, deleting") + await follow.delete() + num_follows -= 1 + continue + + retweet = "retweeted_status" in status.__dict__ + if retweet and not follow.retweets: + continue + + embeds = tweet_embeds(status) + mod = "re" if retweet else "" + timestamp = int(status.created_at.timestamp()) + + try: + await channel.send( + f"`@{status.user.screen_name}` {mod}tweeted this at ", + embeds=embeds, + ) + except Exception: + logger.debug(f"Failed to send message to {channel.id} in {channel.guild.name}") + + if num_follows == 0: + logger.warning(f"Account {status.author.screen_name} no longer has followers, removing") + account = await TwitterAccount.find_one(q(twitter_id=status.author.id)) + if account: + await account.delete() + self.disconnect() + + async def twitter(bot: Snake) -> None: """ Sync tweets in the background. @@ -84,79 +156,41 @@ async def twitter(bot: Snake) -> None: if not config.twitter: logger.warn("Missing Twitter config, not starting") return - logger.debug("Starting Task-twitter") + stream = JARVISTwitterStream(bot=bot, **config.twitter) auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"]) api = tweepy.API(auth) + logger.debug("Starting Task-twitter") while True: accounts = TwitterAccount.find() - accounts_to_delete = [] # Go through all actively followed accounts + ids = [] + async for account in accounts: logger.debug(f"Checking account {account.handle}") # Check if account needs updated (handle changes) if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc): logger.debug(f"Account {account.handle} out of sync, updating") - user = api.get_user(user_id=account.twitter_id) + try: + user = api.get_user(user_id=account.twitter_id) + except Exception: + logger.warn(f"Failed to get {account.handle}, deleting") + await account.delete() + continue + if not user: + logger.warn(f"Failed to get {account.handle}, deleting") + await account.delete() + continue account.handle = user.screen_name account.last_sync = datetime.now(tz=timezone.utc) await account.commit() + ids.append(account.twitter_id) - # Get new tweets - if tweets := api.user_timeline(user_id=account.twitter_id, since_id=account.last_tweet): - logger.debug(f"{account.handle} has new tweets") - tweets = sorted(tweets, key=lambda x: x.id) - follows = TwitterFollow.find(q(twitter_id=account.twitter_id)) - follows_to_delete = [] - num_follows = 0 - - # Go through follows and send tweet if necessary - async for follow in follows: - num_follows += 1 - guild = await bot.fetch_guild(follow.guild) - if not guild: - logger.warning(f"Follow {follow.id}'s guild no longer exists, deleting") - follows_to_delete.append(follow) - continue - channel = await bot.fetch_channel(follow.channel) - if not channel: - logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting") - follows_to_delete.append(follow) - continue - for tweet in tweets: - retweet = "retweeted_status" in tweet.__dict__ - if retweet and not follow.retweets: - continue - - embeds = tweet_embeds(tweet) - mod = "re" if retweet else "" - timestamp = int(tweet.created_at.timestamp()) - - try: - await channel.send( - f"`@{account.handle}` {mod}tweeted this at ", - embeds=embeds, - ) - except Exception: - logger.debug( - f"Failed to send message to {channel.id} in {channel.guild.name}" - ) - - # Delete invalid follows - for follow in follows_to_delete: - await follow.delete() - - if num_follows == 0: - accounts_to_delete.append(account) - else: - newest = tweets[-1] - account.update(q(last_tweet=newest.id)) - await account.commit() - - # Delete invalid accounts (no follows) - for account in accounts_to_delete: - logger.info(f"{account.handle} has no followers, removing") - await account.delete() - - # Only check once a minute - await asyncio.sleep(60) + # Get new tweets + logger.debug(f"Starting stream with {len(ids)} accounts") + stream.current_filter = ids + try: + await stream.filter(follow=ids) + except Exception: + logger.error("Encountered error with stream", stack_info=True) + logger.debug("Stream disconnected, updating filters and re-starting") diff --git a/poetry.lock b/poetry.lock index ebd5b75..3771afb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -223,7 +223,7 @@ python-versions = "*" [[package]] name = "jarvis-core" -version = "0.8.0" +version = "0.8.1" description = "JARVIS core" category = "main" optional = false @@ -242,7 +242,7 @@ umongo = "^3.1.0" type = "git" url = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git" reference = "main" -resolved_reference = "02b60a29ebbab91d9ad4cf876eb10c2aea6f2231" +resolved_reference = "257eaaea951d7f8ed5f6e1c4d01abed6d47e9f0d" [[package]] name = "marshmallow" @@ -451,6 +451,7 @@ optional = false python-versions = ">=3.7" [package.dependencies] +aiohttp = {version = ">=3.7.3,<4", optional = true, markers = "extra == \"async\""} oauthlib = ">=3.2.0,<4" requests = ">=2.27.0,<3" requests-oauthlib = ">=1.2.0,<2" @@ -530,7 +531,7 @@ multidict = ">=4.0" [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "a07ca72dcff5520aefe9b577b0fcaf6804e488d91cdf4779b81e37ccc5ba082b" +content-hash = "ca53ee1d35badd4d592b79ddc8280a04e74281bc5a21f2537d741a3b5650fad1" [metadata.files] aiofiles = [ diff --git a/pyproject.toml b/pyproject.toml index 986cd5e..c409862 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "jarvis-tasks" -version = "0.5.0" +version = "0.6.0" description = "" authors = ["Your Name "] @@ -9,7 +9,7 @@ python = "^3.10" jarvis-core = {git = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git", rev = "main"} dis-snek = "*" aiohttp = "^3.8.1" -tweepy = "^4.5.0" +tweepy = {extras = ["async"], version = "^4.8.0"} asyncpraw = "^7.5.0" [tool.poetry.dev-dependencies]