250 lines
8.8 KiB
Python
250 lines
8.8 KiB
Python
"""JARVIS Twitter sync."""
|
|
import logging
|
|
from asyncio import sleep
|
|
from datetime import datetime, timedelta, timezone
|
|
from html import unescape
|
|
from typing import List
|
|
|
|
import tweepy.asynchronous
|
|
from jarvis_core.db import q
|
|
from jarvis_core.db.models import TwitterAccount, TwitterFollow
|
|
from naff import Client
|
|
from naff.client.errors import NotFound
|
|
from naff.models.discord.embed import Embed
|
|
from tweepy.asynchronous import AsyncStream
|
|
from tweepy.models import Status
|
|
|
|
from jarvis_tasks.config import TaskConfig
|
|
from jarvis_tasks.prometheus.stats import twitter_count, twitter_error, twitter_gauge
|
|
from jarvis_tasks.util import build_embed
|
|
|
|
config = TaskConfig.from_yaml()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def tweet_embeds(tweet: Status) -> List[Embed]:
|
|
"""
|
|
Build a tweet embeds.
|
|
|
|
Args:
|
|
tweet: Tweet to build embeds
|
|
"""
|
|
url = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}"
|
|
entities = tweet.__dict__.get("extended_entities", {})
|
|
media = entities.get("media", [])
|
|
|
|
photos = []
|
|
for item in media:
|
|
if item["type"] in ["photo", "animated_gif"]:
|
|
photos.append(item["media_url_https"])
|
|
if extended := tweet.__dict__.get("extended_tweet"):
|
|
text = extended.get("full_text", tweet.text)
|
|
else:
|
|
text = tweet.text
|
|
if subtweet := tweet.__dict__.get("quoted_status", None):
|
|
subuser = subtweet.user
|
|
text += f"\n\n> [@{subuser.name}](https://twitter.com/{subuser.screen_name})"
|
|
text += f"\n> {subtweet.text}"
|
|
if entites := subtweet.__dict__.get("extended_entities", {}):
|
|
submedia = entites.get("media", [])
|
|
for item in submedia:
|
|
if item["type"] in ["photo", "animated_gif"]:
|
|
photos.append(item["media_url_https"])
|
|
|
|
text = unescape(text)
|
|
|
|
base_embed = build_embed(
|
|
title="",
|
|
description=(text + f"\n\n[View this tweet]({url})"),
|
|
fields=[],
|
|
color="#1DA1F2",
|
|
url=url,
|
|
)
|
|
base_embed.set_author(
|
|
name="@" + tweet.user.name,
|
|
url=url,
|
|
icon_url=tweet.author.profile_image_url_https,
|
|
)
|
|
base_embed.set_footer(
|
|
text="Twitter",
|
|
icon_url="https://abs.twimg.com/icons/apple-touch-icon-192x192.png",
|
|
)
|
|
|
|
embeds = [base_embed]
|
|
|
|
if len(photos) > 0:
|
|
embeds[0].set_image(url=photos[0])
|
|
for photo in photos[1:4]:
|
|
embed = Embed(url=url)
|
|
embed.set_image(url=photo)
|
|
embeds.append(embed)
|
|
|
|
return embeds
|
|
|
|
|
|
class JARVISTwitterStream(AsyncStream):
|
|
"""JARVIS Twitter AsyncStream client."""
|
|
|
|
def __init__(self, bot: Client, *args, **kwargs):
|
|
if not bot:
|
|
raise ValueError("Missing bot")
|
|
super().__init__(*args, **kwargs)
|
|
self.bot = bot
|
|
self.current_filter = None
|
|
|
|
async def on_keep_alive(self) -> None:
|
|
"""Override keep-alive to track new accounts."""
|
|
await super().on_keep_alive()
|
|
ids = [x.twitter_id async for x in TwitterAccount.find()]
|
|
if ids != self.current_filter:
|
|
logger.debug("Follows have changed, disconnected")
|
|
self.disconnect()
|
|
|
|
async def on_request_error(self, status_code: int) -> None:
|
|
"""
|
|
Wrapper for on_request_error. Mainly used to catch 401 errors.
|
|
|
|
Args:
|
|
status_code: HTTP Status Code
|
|
"""
|
|
logger.error(f"Received status code {status_code} while streaming, restarting")
|
|
errors = twitter_error.labels(error_code=status_code)
|
|
errors.inc()
|
|
self.disconnect()
|
|
|
|
async def on_status(self, status: Status) -> None:
|
|
"""
|
|
Process new statuses.
|
|
|
|
Args:
|
|
status: The status to process
|
|
"""
|
|
if status.author.id not in self.current_filter:
|
|
return
|
|
logger.debug(f"{status.author.screen_name} sent new tweet")
|
|
follows = TwitterFollow.find(q(twitter_id=status.author.id))
|
|
num_follows = 0
|
|
|
|
async for follow in follows:
|
|
num_follows += 1
|
|
|
|
guild = await self.bot.fetch_guild(follow.guild)
|
|
if not guild:
|
|
logger.warn(f"Follow {follow.id} invalid, deleting")
|
|
await follow.delete()
|
|
num_follows -= 1
|
|
continue
|
|
|
|
channel = await guild.fetch_channel(follow.channel)
|
|
if not channel:
|
|
logger.warn(f"Follow {follow.id} invalid, deleting")
|
|
await follow.delete()
|
|
num_follows -= 1
|
|
continue
|
|
|
|
retweet = "retweeted_status" in status.__dict__
|
|
if retweet and not follow.retweets:
|
|
continue
|
|
|
|
embeds = tweet_embeds(status)
|
|
mod = "re" if retweet else ""
|
|
timestamp = int(status.created_at.timestamp())
|
|
|
|
try:
|
|
await channel.send(
|
|
f"`@{status.user.screen_name}` {mod}tweeted this at <t:{timestamp}:f>",
|
|
embeds=embeds,
|
|
)
|
|
count = twitter_count.labels(
|
|
guild_id=guild.id, guild_name=guild.name, twitter_handle=status.user.screen_name
|
|
)
|
|
count.inc()
|
|
except NotFound:
|
|
logger.warn(f"Follow {follow.id} invalid, deleting")
|
|
await follow.delete()
|
|
num_follows -= 1
|
|
continue
|
|
except Exception:
|
|
logger.debug(f"Failed to send message to {channel.id} in {channel.guild.name}")
|
|
|
|
if num_follows == 0:
|
|
logger.warning(f"Account {status.author.screen_name} no longer has followers, removing")
|
|
account = await TwitterAccount.find_one(q(twitter_id=status.author.id))
|
|
if account:
|
|
await account.delete()
|
|
self.disconnect()
|
|
else:
|
|
gauge = twitter_gauge.labels(twitter_handle=status.user.screen_name)
|
|
gauge.set(num_follows)
|
|
|
|
|
|
async def twitter(bot: Client) -> None:
|
|
"""
|
|
Sync tweets in the background.
|
|
|
|
Args:
|
|
bot: Client instance
|
|
"""
|
|
if not config.twitter:
|
|
logger.warn("Missing Twitter config, not starting")
|
|
return
|
|
stream = JARVISTwitterStream(bot=bot, **config.twitter)
|
|
auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"])
|
|
api = tweepy.API(auth)
|
|
logger.debug("Starting Task-twitter")
|
|
|
|
logger.debug("Validating follows")
|
|
async for account in TwitterAccount.find():
|
|
count = 0
|
|
|
|
async for follow in TwitterFollow.find(q(twitter_id=account.twitter_id)):
|
|
count += 1
|
|
|
|
guild = await bot.fetch_guild(follow.guild)
|
|
channel = await bot.fetch_channel(follow.channel)
|
|
if not guild or not channel:
|
|
logger.debug(f"Follow {follow.id} invalid, deleting")
|
|
await follow.delete()
|
|
count -= 1
|
|
continue
|
|
|
|
if count == 0:
|
|
logger.debug(f"Account {account.handle} has no followers, removing")
|
|
await account.delete()
|
|
else:
|
|
gauge = twitter_gauge.labels(twitter_handle=account.handle)
|
|
gauge.set(count)
|
|
while True:
|
|
accounts = TwitterAccount.find()
|
|
|
|
# Go through all actively followed accounts
|
|
ids = []
|
|
|
|
async for account in accounts:
|
|
logger.debug(f"Checking account {account.handle}")
|
|
# Check if account needs updated (handle changes)
|
|
if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc):
|
|
logger.debug(f"Account {account.handle} out of sync, updating")
|
|
try:
|
|
user = api.get_user(user_id=account.twitter_id)
|
|
except Exception:
|
|
logger.error("Encountered API error", exc_info=True)
|
|
continue
|
|
if not user:
|
|
logger.warn(f"Failed to get {account.handle}, deleting")
|
|
await account.delete()
|
|
continue
|
|
account.handle = user.screen_name
|
|
account.last_sync = datetime.now(tz=timezone.utc)
|
|
await account.commit()
|
|
ids.append(account.twitter_id)
|
|
|
|
# Get new tweets
|
|
logger.debug(f"Starting stream with {len(ids)} accounts")
|
|
stream.current_filter = ids
|
|
try:
|
|
await stream.filter(follow=ids)
|
|
except Exception:
|
|
logger.error("Encountered error with stream", stack_info=True)
|
|
logger.debug("Stream disconnected, updating filters and re-starting")
|
|
await sleep(5)
|