Twitter rewrite

This commit is contained in:
Zeva Rose 2023-03-15 23:01:40 +00:00
parent df6afa5a1b
commit 6373de5ddd
4 changed files with 1156 additions and 1122 deletions

View file

@ -2,7 +2,7 @@
import asyncio
from typing import Optional
import rook
#import rook
from jarvis_core.db import connect
from jarvis_core.log import get_logger
from naff import Client, Intents
@ -36,8 +36,8 @@ async def _start(config: Optional[str] = "config.yaml") -> None:
# Load config
config = TaskConfig.from_yaml(config)
if config.rook_token:
rook.start(token=config.rook_token, labels={"env": "dev"})
# if config.rook_token:
# rook.start(token=config.rook_token, labels={"env": "dev"})
# Connect to database
testing = config.mongo["database"] != "jarvis"

View file

@ -5,14 +5,14 @@ from datetime import datetime, timedelta, timezone
from html import unescape
from typing import List
import tweepy.asynchronous
from jarvis_core.db import q
from jarvis_core.db.models import TwitterAccount, TwitterFollow
from naff import Client
from naff.client.errors import NotFound
from naff.models.discord.embed import Embed
from tweepy.asynchronous import AsyncStream
from tweepy.models import Status
from tweepy.streaming import StreamRule
from tweepy.asynchronous import AsyncClient, AsyncStreamingClient
from tweepy import Media, Tweet, User
from jarvis_tasks.config import TaskConfig
from jarvis_tasks.prometheus.stats import twitter_count, twitter_error, twitter_gauge
@ -20,36 +20,47 @@ from jarvis_tasks.util import build_embed
config = TaskConfig.from_yaml()
logger = logging.getLogger(__name__)
tlogger = logging.getLogger("Tweepy")
tlogger.setLevel(logging.DEBUG)
DEFAULT_EXPANSIONS = "author_id,referenced_tweets.id,in_reply_to_user_id,attachments.media_keys,referenced_tweets.id.author_id,entities.mentions.username"
DEFAULT_MEDIA_FIELDS = "url"
DEFAULT_TWEET_FIELDS = "created_at"
DEFAULT_USER_FIELDS = "url,profile_image_url"
def tweet_embeds(tweet: Status) -> List[Embed]:
async def tweet_embeds(tweet: Tweet, retweet: bool, quoted: bool, api: AsyncClient) -> List[Embed]:
"""
Build a tweet embeds.
Args:
tweet: Tweet to build embeds
"""
url = f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id}"
entities = tweet.__dict__.get("extended_entities", {})
media = entities.get("media", [])
author: User = tweet.includes["users"][0]
url = f"https://twitter.com/{author.username}/status/{tweet.data.id}"
media: list[Media] = tweet.includes.get("media", [])
photos = []
for item in media:
if item.type in ["photo", "animated_gif"]:
photos.append(item.url)
text = tweet.data.text
if retweet:
text = "> " + tweet.includes["tweets"][0].text
if quoted:
quote = await api.get_tweet(
id=tweet.data.referenced_tweets[0].id,
expansions=DEFAULT_EXPANSIONS,
media_fields=DEFAULT_MEDIA_FIELDS,
user_fields=DEFAULT_USER_FIELDS,
)
quote_author = quote.includes["users"][0]
text += f"\n\n> [@{quote_author.name}]({quote_author.url})"
text += f"\n> {quote.data.text}"
quote_media: list[Media] = tweet.includes.get("media", [])
for item in quote_media:
if item["type"] in ["photo", "animated_gif"]:
photos.append(item["media_url_https"])
if extended := tweet.__dict__.get("extended_tweet"):
text = extended.get("full_text", tweet.text)
else:
text = tweet.text
if subtweet := tweet.__dict__.get("quoted_status", None):
subuser = subtweet.user
text += f"\n\n> [@{subuser.name}](https://twitter.com/{subuser.screen_name})"
text += f"\n> {subtweet.text}"
if entites := subtweet.__dict__.get("extended_entities", {}):
submedia = entites.get("media", [])
for item in submedia:
if item["type"] in ["photo", "animated_gif"]:
photos.append(item["media_url_https"])
photos.append(item.url)
text = unescape(text)
@ -61,9 +72,9 @@ def tweet_embeds(tweet: Status) -> List[Embed]:
url=url,
)
base_embed.set_author(
name="@" + tweet.user.name,
url=url,
icon_url=tweet.author.profile_image_url_https,
name="@" + author.name,
url=author.url,
icon_url=author.profile_image_url,
)
base_embed.set_footer(
text="Twitter",
@ -82,15 +93,16 @@ def tweet_embeds(tweet: Status) -> List[Embed]:
return embeds
class JARVISTwitterStream(AsyncStream):
class JARVISTwitterStream(AsyncStreamingClient):
"""JARVIS Twitter AsyncStream client."""
def __init__(self, bot: Client, *args, **kwargs):
def __init__(self, bot: Client, api: AsyncClient, *args, **kwargs):
if not bot:
raise ValueError("Missing bot")
super().__init__(*args, **kwargs)
self.bot = bot
self.current_filter = None
self.api = api
async def on_keep_alive(self) -> None:
"""Override keep-alive to track new accounts."""
@ -107,24 +119,43 @@ class JARVISTwitterStream(AsyncStream):
Args:
status_code: HTTP Status Code
"""
logger.error(f"Received status code {status_code} while streaming, restarting")
logger.error(f"Received status code {status_code} while streaming, restarting", exc_info=True)
errors = twitter_error.labels(error_code=status_code)
errors.inc()
self.disconnect()
async def on_status(self, status: Status) -> None:
async def on_tweet(self, status: Tweet) -> None:
"""
Process new statuses.
Args:
status: The status to process
"""
if status.author.id not in self.current_filter:
if status.author_id not in self.current_filter:
return
logger.debug(f"{status.author.screen_name} sent new tweet")
follows = TwitterFollow.find(q(twitter_id=status.author.id))
status = await self.api.get_tweet(
id=status.id,
expansions=DEFAULT_EXPANSIONS,
media_fields=DEFAULT_MEDIA_FIELDS,
tweet_fields=DEFAULT_TWEET_FIELDS,
user_fields=DEFAULT_USER_FIELDS,
)
author = status.includes.get("users")[0]
logger.debug(f"{author.username} sent new tweet")
follows = TwitterFollow.find(q(twitter_id=author.id))
num_follows = 0
retweet = False
quote = False
mod = "tweeted"
if status.data.referenced_tweets:
if status.data.referenced_tweets[0].type == "retweeted":
retweet = True
mod = "retweeted"
if status.data.referenced_tweets[0].type == "quoted":
quote = True
mod = "quoted"
async for follow in follows:
num_follows += 1
@ -142,22 +173,18 @@ class JARVISTwitterStream(AsyncStream):
num_follows -= 1
continue
retweet = "retweeted_status" in status.__dict__
if retweet and not follow.retweets:
continue
embeds = tweet_embeds(status)
mod = "re" if retweet else ""
timestamp = int(status.created_at.timestamp())
embeds = await tweet_embeds(status, retweet, quote, self.api)
timestamp = int(status.data.created_at.timestamp())
try:
await channel.send(
f"`@{status.user.screen_name}` {mod}tweeted this at <t:{timestamp}:f>",
f"`@{author.username}` {mod} this at <t:{timestamp}:f>",
embeds=embeds,
)
count = twitter_count.labels(
guild_id=guild.id, guild_name=guild.name, twitter_handle=status.user.screen_name
)
count = twitter_count.labels(guild_id=guild.id, guild_name=guild.name, twitter_handle=author.username)
count.inc()
except NotFound:
logger.warn(f"Follow {follow.id} invalid, deleting")
@ -168,13 +195,13 @@ class JARVISTwitterStream(AsyncStream):
logger.debug(f"Failed to send message to {channel.id} in {channel.guild.name}")
if num_follows == 0:
logger.warning(f"Account {status.author.screen_name} no longer has followers, removing")
account = await TwitterAccount.find_one(q(twitter_id=status.author.id))
logger.warning(f"Account {author.username} no longer has followers, removing")
account = await TwitterAccount.find_one(q(twitter_id=author.id))
if account:
await account.delete()
self.disconnect()
else:
gauge = twitter_gauge.labels(twitter_handle=status.user.screen_name)
gauge = twitter_gauge.labels(twitter_handle=author.name)
gauge.set(num_follows)
@ -188,9 +215,11 @@ async def twitter(bot: Client) -> None:
if not config.twitter:
logger.warn("Missing Twitter config, not starting")
return
stream = JARVISTwitterStream(bot=bot, **config.twitter)
auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"])
api = tweepy.API(auth)
api = AsyncClient(bearer_token=config.twitter["bearer_token"])
stream = JARVISTwitterStream(bot=bot, bearer_token=config.twitter["bearer_token"], api=api)
rules = await stream.get_rules()
if rules.data:
await stream.delete_rules(rules.data)
logger.debug("Starting Task-twitter")
logger.debug("Validating follows")
@ -199,8 +228,12 @@ async def twitter(bot: Client) -> None:
async for follow in TwitterFollow.find(q(twitter_id=account.twitter_id)):
count += 1
try:
guild = await bot.fetch_guild(follow.guild)
except Exception:
guild = None
channel = None
if guild:
channel = await bot.fetch_channel(follow.channel)
if not guild or not channel:
logger.debug(f"Follow {follow.id} invalid, deleting")
@ -214,8 +247,9 @@ async def twitter(bot: Client) -> None:
else:
gauge = twitter_gauge.labels(twitter_handle=account.handle)
gauge.set(count)
rules = []
while True:
accounts = TwitterAccount.find()
accounts = TwitterAccount.find({})
# Go through all actively followed accounts
ids = []
@ -226,7 +260,7 @@ async def twitter(bot: Client) -> None:
if account.last_sync + timedelta(hours=1) <= datetime.now(tz=timezone.utc):
logger.debug(f"Account {account.handle} out of sync, updating")
try:
user = api.get_user(user_id=account.twitter_id)
user = await api.get_user(id=account.twitter_id)
except Exception:
logger.error("Encountered API error", exc_info=True)
continue
@ -234,7 +268,7 @@ async def twitter(bot: Client) -> None:
logger.warn(f"Failed to get {account.handle}, deleting")
await account.delete()
continue
account.handle = user.screen_name
account.handle = user.data.username
account.last_sync = datetime.now(tz=timezone.utc)
await account.commit()
ids.append(account.twitter_id)
@ -242,9 +276,34 @@ async def twitter(bot: Client) -> None:
# Get new tweets
logger.debug(f"Starting stream with {len(ids)} accounts")
stream.current_filter = ids
if ids:
ids = list(set(ids))
try:
await stream.filter(follow=ids)
rule = ""
for idx, tid in enumerate(ids):
tmp_rule = rule
if idx != 0:
tmp_rule += " OR "
tmp_rule += f"from:{tid}"
if len(tmp_rule) > 512:
rules.append(StreamRule(value=rule, tag=f"e{idx}"))
tmp_rule = tmp_rule.split("OR")[-1]
rule = tmp_rule
if len(rule) > 0:
rules.append(StreamRule(value=rule))
await stream.add_rules(rules)
await stream.filter(
expansions=DEFAULT_EXPANSIONS,
media_fields=DEFAULT_MEDIA_FIELDS,
tweet_fields=DEFAULT_TWEET_FIELDS,
user_fields=DEFAULT_USER_FIELDS,
)
except Exception:
logger.error("Encountered error with stream", stack_info=True)
logger.error("Encountered error with stream", exc_info=True)
logger.debug("Stream disconnected, updating filters and re-starting")
await sleep(5)
await stream.delete_rules(rules)
rules.clear()
else:
logger.warn("No accounts to follow, sleeping...")
await sleep(15)

2087
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,13 +5,13 @@ description = ""
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = "^3.10"
python = ">=3.10,<4"
jarvis-core = {git = "https://git.zevaryx.com/stark-industries/jarvis/jarvis-core.git", rev = "main"}
naff = "^1.2.0"
aiohttp = "^3.8.1"
tweepy = {extras = ["async"], version = "^4.8.0"}
naff = ">=2.1.0"
aiohttp = "^3.8.3"
tweepy = {extras = ["async"], version = "^4.13.0"}
asyncpraw = "^7.5.0"
rook = "^0.1.170"
#rook = "^0.1.170"
uvicorn = "^0.17.6"
prometheus-client = "^0.14.1"