"""JARVIS Twitter sync.""" import logging from datetime import datetime, timedelta, timezone from typing import List import tweepy.asynchronous from jarvis_core.db import q from jarvis_core.db.models import TwitterAccount, TwitterFollow from naff import Client from naff.client.errors import NotFound from naff.models.discord.embed import Embed from tweepy.asynchronous import AsyncStream from tweepy.models import Status from jarvis_tasks.config import TaskConfig from jarvis_tasks.util import build_embed config = TaskConfig.from_yaml() logger = logging.getLogger(__name__) def tweet_embeds(tweet: Status) -> List[Embed]: """ Build a tweet embeds. Args: tweet: Tweet to build embeds """ url = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}" entities = tweet.__dict__.get("extended_entities", {}) media = entities.get("media", []) photos = [] for item in media: if item["type"] in ["photo", "animated_gif"]: photos.append(item["media_url_https"]) text = tweet.text if subtweet := tweet.__dict__.get("quoted_status", None): subuser = subtweet.user text += f"\n\n> [@{subuser.name}](https://twitter.com/{subuser.screen_name})" text += f"\n> {subtweet.text}" if entites := subtweet.__dict__.get("extended_entities", {}): submedia = entites.get("media", []) for item in submedia: if item["type"] in ["photo", "animated_gif"]: photos.append(item["media_url_https"]) base_embed = build_embed( title="", description=(text + f"\n\n[View this tweet]({url})"), fields=[], color="#1DA1F2", url=url, ) base_embed.set_author( name="@" + tweet.user.name, url=url, icon_url=tweet.author.profile_image_url_https, ) base_embed.set_footer( text="Twitter", icon_url="https://abs.twimg.com/icons/apple-touch-icon-192x192.png", ) embeds = [base_embed] if len(photos) > 0: embeds[0].set_image(url=photos[0]) for photo in photos[1:4]: embed = Embed(url=url) embed.set_image(url=photo) embeds.append(embed) return embeds class JARVISTwitterStream(AsyncStream): """JARVIS Twitter AsyncStream client.""" def __init__(self, bot: Client, *args, **kwargs): if not bot: raise ValueError("Missing bot") super().__init__(*args, **kwargs) self.bot = bot self.current_filter = None async def on_keep_alive(self) -> None: """Override keep-alive to track new accounts.""" await super().on_keep_alive() ids = [x.twitter_id async for x in TwitterAccount.find()] if ids != self.current_filter: logger.debug("Follows have changed, disconnected") self.disconnect() async def on_status(self, status: Status) -> None: """ Process new statuses. Args: status: The status to process """ if status.author.id not in self.current_filter: return logger.debug(f"{status.author.screen_name} sent new tweet") follows = TwitterFollow.find(q(twitter_id=status.author.id)) num_follows = 0 async for follow in follows: num_follows += 1 guild = await self.bot.fetch_guild(follow.guild) if not guild: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue channel = await guild.fetch_channel(follow.channel) if not channel: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue retweet = "retweeted_status" in status.__dict__ if retweet and not follow.retweets: continue embeds = tweet_embeds(status) mod = "re" if retweet else "" timestamp = int(status.created_at.timestamp()) try: await channel.send( f"`@{status.user.screen_name}` {mod}tweeted this at ", embeds=embeds, ) except NotFound: logger.warn(f"Follow {follow.id} invalid, deleting") await follow.delete() num_follows -= 1 continue except Exception: logger.debug(f"Failed to send message to {channel.id} in {channel.guild.name}") if num_follows == 0: logger.warning(f"Account {status.author.screen_name} no longer has followers, removing") account = await TwitterAccount.find_one(q(twitter_id=status.author.id)) if account: await account.delete() self.disconnect() async def twitter(bot: Client) -> None: """ Sync tweets in the background. Args: bot: Client instance """ if not config.twitter: logger.warn("Missing Twitter config, not starting") return stream = JARVISTwitterStream(bot=bot, **config.twitter) auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"]) api = tweepy.API(auth) logger.debug("Starting Task-twitter") logger.debug("Validating follows") async for account in TwitterAccount.find(): count = 0 async for follow in TwitterFollow.find(q(twitter_id=account.twitter_id)): count += 1 guild = await bot.fetch_guild(follow.guild) channel = await bot.fetch_channel(follow.channel) if not guild or not channel: logger.debug(f"Follow {follow.id} invalid, deleting") await follow.delete() count -= 1 continue if count == 0: logger.debug(f"Account {account.handle} has no followers, removing") await account.delete() while True: accounts = TwitterAccount.find() # Go through all actively followed accounts ids = [] async for account in accounts: logger.debug(f"Checking account {account.handle}") # Check if account needs updated (handle changes) if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc): logger.debug(f"Account {account.handle} out of sync, updating") try: user = api.get_user(user_id=account.twitter_id) except Exception: logger.warn(f"Failed to get {account.handle}, deleting") await account.delete() continue if not user: logger.warn(f"Failed to get {account.handle}, deleting") await account.delete() continue account.handle = user.screen_name account.last_sync = datetime.now(tz=timezone.utc) await account.commit() ids.append(account.twitter_id) # Get new tweets logger.debug(f"Starting stream with {len(ids)} accounts") stream.current_filter = ids try: await stream.filter(follow=ids) except Exception: logger.error("Encountered error with stream", stack_info=True) logger.debug("Stream disconnected, updating filters and re-starting")