"""JARVIS Twitter sync.""" import asyncio from datetime import datetime, timedelta, timezone from logging import Logger from typing import List import tweepy from dis_snek import Snake from dis_snek.models.discord.embed import Embed from jarvis_core.db import q from jarvis_core.db.models import TwitterAccount, TwitterFollow from jarvis_core.util import build_embed from jarvis_tasks.config import TaskConfig config = TaskConfig.from_yaml() def tweet_embeds(tweet: tweepy.models.Status) -> List[Embed]: """ Build a tweet embeds. Args: tweet: Tweet to build embed """ url = f"https://twitter.com/{tweet.user.name}/status/{tweet.id}" entities = tweet.__dict__.get("extended_entities", {}) media = entities.get("media", []) photos = [] for item in media: if item["type"] in ["photo", "animated_gif"]: photos.append(item["media_url_https"]) text = tweet.text if subtweet := tweet.__dict__.get("quoted_status", None): subuser = subtweet.user text += f"\n\n> [@{subuser.name}](https://twitter.com/{subuser.name})" text += f"\n> {subtweet.text}" if entites := subtweet.__dict__.get("extended_entities", {}): submedia = entites.get("media", []) for item in submedia: if item["type"] in ["photo", "animated_gif"]: photos.append(item["media_url_https"]) base_embed = build_embed( title="", description=(text + f"\n\n[View this tweet]({url})"), fields=[], color="#1DA1F2", url=url, ) base_embed.set_author( name="@" + tweet.user.name, url=url, icon_url=tweet.author.profile_image_url_https, ) base_embed.set_footer( text="Twitter", icon_url="https://abs.twimg.com/icons/apple-touch-icon-192x192.png", ) embeds = [base_embed] if len(photos) > 0: embeds[0].set_image(url=photos[0]) for photo in photos[1:4]: embed = Embed(url=url) embed.set_image(url=photo) embeds.append(embed) return embeds async def twitter(bot: Snake, logger: Logger) -> None: """ Sync tweets in the background. Args: bot: Snake instance logger: Global logger """ auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"]) api = tweepy.API(auth) while True: accounts = TwitterAccount.find() accounts_to_delete = [] # Go through all actively followed accounts async for account in accounts: logger.debug(f"Checking account {account.handle}") # Check if account needs updated (handle changes) if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc): logger.debug(f"Account {account.handle} out of sync, updating") user = api.get_user(user_id=account.twitter_id) account.handle = user.screen_name account.last_sync = datetime.now(tz=timezone.utc) await account.commit() # Get new tweets if tweets := api.user_timeline(user_id=account.twitter_id, since_id=account.last_tweet): logger.debug(f"{account.handle} has new tweets") tweets = sorted(tweets, key=lambda x: x.id) follows = TwitterFollow.find(q(twitter_id=account.twitter_id)) follows_to_delete = [] num_follows = 0 # Go through follows and send tweet if necessary async for follow in follows: num_follows += 1 guild = await bot.fetch_guild(follow.guild) if not guild: logger.warning(f"Follow {follow.id}'s guild no longer exists, deleting") follows_to_delete.append(follow) continue channel = await bot.fetch_channel(follow.channel) if not channel: logger.warning(f"Follow {follow.id}'s channel no longer exists, deleting") follows_to_delete.append(follow) continue for tweet in tweets: retweet = "retweeted_status" in tweet.__dict__ if retweet and not follow.retweets: continue embeds = tweet_embeds(tweet) mod = "re" if retweet else "" timestamp = int(tweet.created_at.timestamp()) try: await channel.send( f"`@{account.handle}` {mod}tweeted this at ", embeds=embeds, ) except Exception: logger.debug( f"Failed to send message to {channel.id} in {channel.guild.name}" ) # Delete invalid follows for follow in follows_to_delete: await follow.delete() if num_follows == 0: accounts_to_delete.append(account) else: newest = tweets[-1] account.update(q(last_tweet=newest.id)) await account.commit() # Delete invalid accounts (no follows) for account in accounts_to_delete: logger.info(f"Account {account.handle} has no followers, removing") await account.delete() # Only check once a minute await asyncio.sleep(60)