"""JARVIS Twitter sync.""" import logging from asyncio import sleep from datetime import datetime, timedelta, timezone from html import unescape from typing import List from jarvis_core.db.models import TwitterAccount, TwitterFollow from interactions import Client from interactions.client.errors import NotFound from interactions.models.discord.embed import Embed from tweepy.streaming import StreamRule from tweepy.asynchronous import AsyncClient, AsyncStreamingClient from tweepy import Media, Tweet, User from jarvis_tasks.config import load_config from jarvis_tasks.prometheus.stats import twitter_count, twitter_error, twitter_gauge from jarvis_tasks.util import build_embed config = load_config() logger = logging.getLogger(__name__) tlogger = logging.getLogger("Tweepy") tlogger.setLevel(logging.DEBUG) DEFAULT_EXPANSIONS = "author_id,referenced_tweets.id,in_reply_to_user_id,attachments.media_keys,referenced_tweets.id.author_id,entities.mentions.username" DEFAULT_MEDIA_FIELDS = "url" DEFAULT_TWEET_FIELDS = "created_at" DEFAULT_USER_FIELDS = "url,profile_image_url" async def tweet_embeds( tweet: Tweet, retweet: bool, quoted: bool, api: AsyncClient ) -> List[Embed]: """ Build a tweet embeds. Args: tweet: Tweet to build embeds """ author: User = tweet.includes["users"][0] url = f"https://twitter.com/{author.username}/status/{tweet.data.id}" media: list[Media] = tweet.includes.get("media", []) photos = [] for item in media: if item.type in ["photo", "animated_gif"]: photos.append(item.url) text = tweet.data.text if retweet: text = "> " + tweet.includes["tweets"][0].text if quoted: quote = await api.get_tweet( id=tweet.data.referenced_tweets[0].id, expansions=DEFAULT_EXPANSIONS, media_fields=DEFAULT_MEDIA_FIELDS, user_fields=DEFAULT_USER_FIELDS, ) quote_author = quote.includes["users"][0] text += f"\n\n> [@{quote_author.name}]({quote_author.url})" text += f"\n> {quote.data.text}" quote_media: list[Media] = tweet.includes.get("media", []) for item in quote_media: if item["type"] in ["photo", "animated_gif"]: photos.append(item.url) text = unescape(text) base_embed = build_embed( title="", description=(text + f"\n\n[View this tweet]({url})"), fields=[], color="#1DA1F2", url=url, ) base_embed.set_author( name="@" + author.name, url=author.url, icon_url=author.profile_image_url, ) base_embed.set_footer( text="Twitter", icon_url="https://abs.twimg.com/icons/apple-touch-icon-192x192.png", ) embeds = [base_embed] if len(photos) > 0: embeds[0].set_image(url=photos[0]) for photo in photos[1:4]: embed = Embed(url=url) embed.set_image(url=photo) embeds.append(embed) return embeds class JARVISTwitterStream(AsyncStreamingClient): """JARVIS Twitter AsyncStream client.""" def __init__(self, bot: Client, api: AsyncClient, *args, **kwargs): if not bot: raise ValueError("Missing bot") super().__init__(*args, **kwargs) self.bot = bot self.current_filter = None self.api = api async def on_keep_alive(self) -> None: """Override keep-alive to track new accounts.""" await super().on_keep_alive() ids = [x.twitter_id async for x in TwitterAccount.find()] if ids != self.current_filter: logger.debug("Follows have changed, disconnected") self.disconnect() async def on_request_error(self, status_code: int) -> None: """ Wrapper for on_request_error. Mainly used to catch 401 errors. Args: status_code: HTTP Status Code """ logger.error( f"Received status code {status_code} while streaming, restarting", exc_info=True, ) errors = twitter_error.labels(error_code=status_code) errors.inc() self.disconnect() async def on_tweet(self, status: Tweet) -> None: """ Process new statuses. Args: status: The status to process """ if status.author_id not in self.current_filter: return status = await self.api.get_tweet( id=status.id, expansions=DEFAULT_EXPANSIONS, media_fields=DEFAULT_MEDIA_FIELDS, tweet_fields=DEFAULT_TWEET_FIELDS, user_fields=DEFAULT_USER_FIELDS, ) author = status.includes.get("users")[0] logger.debug(f"{author.username} sent new tweet") follows = TwitterFollow.find(TwitterFollow.twitter_id == author.id) num_follows = 0 retweet = False quote = False mod = "tweeted" if status.data.referenced_tweets: if status.data.referenced_tweets[0].type == "retweeted": retweet = True mod = "retweeted" if status.data.referenced_tweets[0].type == "quoted": quote = True mod = "quoted" async for follow in follows: num_follows += 1 guild = await self.bot.fetch_guild(follow.guild) if not guild: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue channel = await guild.fetch_channel(follow.channel) if not channel: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue if retweet and not follow.retweets: continue embeds = await tweet_embeds(status, retweet, quote, self.api) timestamp = int(status.data.created_at.timestamp()) try: await channel.send( f"`@{author.username}` {mod} this at ", embeds=embeds, ) count = twitter_count.labels( guild_id=guild.id, guild_name=guild.name, twitter_handle=author.username, ) count.inc() except NotFound: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue except Exception: logger.debug( f"Failed to send message to {channel.id} in {channel.guild.name}" ) if num_follows == 0: logger.warning( f"Account {author.username} no longer has followers, removing" ) account = await TwitterAccount.find_one( TwitterAccount.twitter_id == author.id ) if account: await account.delete() self.disconnect() else: gauge = twitter_gauge.labels(twitter_handle=author.name) gauge.set(num_follows) async def twitter(bot: Client) -> None: """ Sync tweets in the background. Args: bot: Client instance """ if not config.twitter: logger.warn("Missing Twitter config, not starting") return api = AsyncClient(bearer_token=config.twitter["bearer_token"]) stream = JARVISTwitterStream( bot=bot, bearer_token=config.twitter["bearer_token"], api=api ) rules = await stream.get_rules() if rules.data: await stream.delete_rules(rules.data) logger.debug("Starting Task-twitter") logger.debug("Validating follows") async for account in TwitterAccount.find(): count = 0 async for follow in TwitterFollow.find( TwitterFollow.twitter_id == account.twitter_id ): count += 1 try: guild = await bot.fetch_guild(follow.guild) except Exception: guild = None channel = None if guild: channel = await bot.fetch_channel(follow.channel) if not guild or not channel: logger.debug(f"Follow {follow.id} invalid, deleting") await follow.delete() count -= 1 continue if count == 0: logger.debug(f"Account {account.handle} has no followers, removing") await account.delete() else: gauge = twitter_gauge.labels(twitter_handle=account.handle) gauge.set(count) rules = [] while True: accounts = TwitterAccount.find({}) # Go through all actively followed accounts ids = [] async for account in accounts: logger.debug(f"Checking account {account.handle}") # Check if account needs updated (handle changes) if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc): logger.debug(f"Account {account.handle} out of sync, updating") try: user = await api.get_user(id=account.twitter_id) except Exception: logger.error("Encountered API error", exc_info=True) continue if not user: logger.warn(f"Failed to get {account.handle}, deleting") await account.delete() continue account.handle = user.data.username account.last_sync = datetime.now(tz=timezone.utc) await account.save() ids.append(account.twitter_id) # Get new tweets logger.debug(f"Starting stream with {len(ids)} accounts") stream.current_filter = ids if ids: ids = list(set(ids)) try: rule = "" for idx, tid in enumerate(ids): tmp_rule = rule if idx != 0: tmp_rule += " OR " tmp_rule += f"from:{tid}" if len(tmp_rule) > 512: rules.append(StreamRule(value=rule, tag=f"e{idx}")) tmp_rule = tmp_rule.split("OR")[-1] rule = tmp_rule if len(rule) > 0: rules.append(StreamRule(value=rule)) await stream.add_rules(rules) await stream.filter( expansions=DEFAULT_EXPANSIONS, media_fields=DEFAULT_MEDIA_FIELDS, tweet_fields=DEFAULT_TWEET_FIELDS, user_fields=DEFAULT_USER_FIELDS, ) except Exception: logger.error("Encountered error with stream", exc_info=True) logger.debug("Stream disconnected, updating filters and re-starting") await sleep(5) await stream.delete_rules(rules) rules.clear() else: logger.warn("No accounts to follow, sleeping...") await sleep(15)