"""JARVIS Twitter sync.""" import logging from asyncio import sleep from datetime import datetime, timedelta, timezone from html import unescape from typing import List import tweepy.asynchronous from jarvis_core.db import q from jarvis_core.db.models import TwitterAccount, TwitterFollow from naff import Client from naff.client.errors import NotFound from naff.models.discord.embed import Embed from tweepy.asynchronous import AsyncStream from tweepy.models import Status from jarvis_tasks.config import TaskConfig from jarvis_tasks.prometheus.stats import twitter_count, twitter_error, twitter_gauge from jarvis_tasks.util import build_embed config = TaskConfig.from_yaml() logger = logging.getLogger(__name__) def tweet_embeds(tweet: Status) -> List[Embed]: """ Build a tweet embeds. Args: tweet: Tweet to build embeds """ url = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}" entities = tweet.__dict__.get("extended_entities", {}) media = entities.get("media", []) photos = [] for item in media: if item["type"] in ["photo", "animated_gif"]: photos.append(item["media_url_https"]) if extended := tweet.__dict__.get("extended_tweet"): text = extended.get("full_text", tweet.text) else: text = tweet.text if subtweet := tweet.__dict__.get("quoted_status", None): subuser = subtweet.user text += f"\n\n> [@{subuser.name}](https://twitter.com/{subuser.screen_name})" text += f"\n> {subtweet.text}" if entites := subtweet.__dict__.get("extended_entities", {}): submedia = entites.get("media", []) for item in submedia: if item["type"] in ["photo", "animated_gif"]: photos.append(item["media_url_https"]) text = unescape(text) base_embed = build_embed( title="", description=(text + f"\n\n[View this tweet]({url})"), fields=[], color="#1DA1F2", url=url, ) base_embed.set_author( name="@" + tweet.user.name, url=url, icon_url=tweet.author.profile_image_url_https, ) base_embed.set_footer( text="Twitter", icon_url="https://abs.twimg.com/icons/apple-touch-icon-192x192.png", ) embeds = [base_embed] if len(photos) > 0: embeds[0].set_image(url=photos[0]) for photo in photos[1:4]: embed = Embed(url=url) embed.set_image(url=photo) embeds.append(embed) return embeds class JARVISTwitterStream(AsyncStream): """JARVIS Twitter AsyncStream client.""" def __init__(self, bot: Client, *args, **kwargs): if not bot: raise ValueError("Missing bot") super().__init__(*args, **kwargs) self.bot = bot self.current_filter = None async def on_keep_alive(self) -> None: """Override keep-alive to track new accounts.""" await super().on_keep_alive() ids = [x.twitter_id async for x in TwitterAccount.find()] if ids != self.current_filter: logger.debug("Follows have changed, disconnected") self.disconnect() async def on_request_error(self, status_code: int) -> None: """ Wrapper for on_request_error. Mainly used to catch 401 errors. Args: status_code: HTTP Status Code """ logger.error(f"Received status code {status_code} while streaming, restarting") errors = twitter_error.labels(error_code=status_code) errors.inc() self.disconnect() async def on_status(self, status: Status) -> None: """ Process new statuses. Args: status: The status to process """ if status.author.id not in self.current_filter: return logger.debug(f"{status.author.screen_name} sent new tweet") follows = TwitterFollow.find(q(twitter_id=status.author.id)) num_follows = 0 async for follow in follows: num_follows += 1 guild = await self.bot.fetch_guild(follow.guild) if not guild: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue channel = await guild.fetch_channel(follow.channel) if not channel: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue retweet = "retweeted_status" in status.__dict__ if retweet and not follow.retweets: continue embeds = tweet_embeds(status) mod = "re" if retweet else "" timestamp = int(status.created_at.timestamp()) try: await channel.send( f"`@{status.user.screen_name}` {mod}tweeted this at ", embeds=embeds, ) count = twitter_count.labels( guild_id=guild.id, guild_name=guild.name, twitter_handle=status.user.screen_name ) count.inc() except NotFound: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue except Exception: logger.debug(f"Failed to send message to {channel.id} in {channel.guild.name}") if num_follows == 0: logger.warning(f"Account {status.author.screen_name} no longer has followers, removing") account = await TwitterAccount.find_one(q(twitter_id=status.author.id)) if account: await account.delete() self.disconnect() else: gauge = twitter_gauge.labels(twitter_handle=status.user.screen_name) gauge.set(num_follows) async def twitter(bot: Client) -> None: """ Sync tweets in the background. Args: bot: Client instance """ if not config.twitter: logger.warn("Missing Twitter config, not starting") return stream = JARVISTwitterStream(bot=bot, **config.twitter) auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"]) api = tweepy.API(auth) logger.debug("Starting Task-twitter") logger.debug("Validating follows") async for account in TwitterAccount.find(): count = 0 async for follow in TwitterFollow.find(q(twitter_id=account.twitter_id)): count += 1 guild = await bot.fetch_guild(follow.guild) channel = await bot.fetch_channel(follow.channel) if not guild or not channel: logger.debug(f"Follow {follow.id} invalid, deleting") await follow.delete() count -= 1 continue if count == 0: logger.debug(f"Account {account.handle} has no followers, removing") await account.delete() else: gauge = twitter_gauge.labels(twitter_handle=account.handle) gauge.set(count) while True: accounts = TwitterAccount.find() # Go through all actively followed accounts ids = [] async for account in accounts: logger.debug(f"Checking account {account.handle}") # Check if account needs updated (handle changes) if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc): logger.debug(f"Account {account.handle} out of sync, updating") try: user = api.get_user(user_id=account.twitter_id) except Exception: logger.error("Encountered API error", exc_info=True) continue if not user: logger.warn(f"Failed to get {account.handle}, deleting") await account.delete() continue account.handle = user.screen_name account.last_sync = datetime.now(tz=timezone.utc) await account.commit() ids.append(account.twitter_id) # Get new tweets logger.debug(f"Starting stream with {len(ids)} accounts") stream.current_filter = ids try: await stream.filter(follow=ids) except Exception: logger.error("Encountered error with stream", stack_info=True) logger.debug("Stream disconnected, updating filters and re-starting") await sleep(5)