jarvis-bot/jarvis/cogs/twitter.py
2022-02-03 21:06:28 -07:00

268 lines
10 KiB
Python

"""J.A.R.V.I.S. Twitter Cog."""
import asyncio
import logging
import tweepy
from bson import ObjectId
from dis_snek import InteractionContext, Permissions, Scale, Snake
from dis_snek.ext.tasks.task import Task
from dis_snek.ext.tasks.triggers import IntervalTrigger
from dis_snek.models.discord.channel import GuildText
from dis_snek.models.discord.components import ActionRow, Select, SelectOption
from dis_snek.models.snek.application_commands import (
OptionTypes,
SlashCommandChoice,
slash_command,
slash_option,
)
from jarvis.config import get_config
from jarvis.db.models import Twitter
from jarvis.utils.permissions import admin_or_permissions
logger = logging.getLogger("discord")
class TwitterCog(Scale):
"""J.A.R.V.I.S. Twitter Cog."""
def __init__(self, bot: Snake):
self.bot = bot
config = get_config()
auth = tweepy.AppAuthHandler(
config.twitter["consumer_key"], config.twitter["consumer_secret"]
)
self.api = tweepy.API(auth)
self._tweets.start()
self._guild_cache = {}
self._channel_cache = {}
@Task.create(trigger=IntervalTrigger(minutes=1))
async def _tweets(self) -> None:
twitters = Twitter.objects(active=True)
handles = Twitter.objects.distinct("handle")
twitter_data = {}
for handle in handles:
try:
data = await asyncio.to_thread(self.api.user_timeline, screen_name=handle)
twitter_data[handle] = data
except Exception as e:
logger.error(f"Error with fetching: {e}")
for twitter in twitters:
try:
tweets = list(
filter(lambda x: x.id > twitter.last_tweet, twitter_data[twitter.handle])
)
if tweets:
guild_id = twitter.guild
channel_id = twitter.channel
tweets = sorted(tweets, key=lambda x: x.id)
if guild_id not in self._guild_cache:
self._guild_cache[guild_id] = await self.bot.get_guild(guild_id)
guild = self._guild_cache[twitter.guild]
if channel_id not in self._channel_cache:
self._channel_cache[channel_id] = await guild.fetch_channel(channel_id)
channel = self._channel_cache[channel_id]
for tweet in tweets:
retweet = "retweeted_status" in tweet.__dict__
if retweet and not twitter.retweets:
continue
timestamp = int(tweet.created_at.timestamp())
url = f"https://twitter.com/{twitter.handle}/status/{tweet.id}"
verb = "re" if retweet else ""
await channel.send(
f"`@{twitter.handle}` {verb}tweeted this at <t:{timestamp}:f>: {url}"
)
newest = max(tweets, key=lambda x: x.id)
twitter.last_tweet = newest.id
twitter.save()
except Exception as e:
logger.error(f"Error with tweets: {e}")
@slash_command(name="twitter", sub_cmd_name="follow", description="Follow a Twitter acount")
@slash_option(
name="handle", description="Twitter account", option_type=OptionTypes.STRING, required=True
)
@slash_option(
name="channel",
description="Channel to post tweets to",
option_type=OptionTypes.CHANNEL,
required=True,
)
@slash_option(
name="retweets",
description="Mirror re-tweets?",
option_type=OptionTypes.STRING,
required=False,
choices=[
SlashCommandChoice(name="Yes", value="Yes"),
SlashCommandChoice(name="No", value="No"),
],
)
@admin_or_permissions(Permissions.MANAGE_GUILD)
async def _twitter_follow(
self, ctx: InteractionContext, handle: str, channel: GuildText, retweets: str = "Yes"
) -> None:
handle = handle.lower()
retweets = retweets == "Yes"
if len(handle) > 15:
await ctx.send("Invalid Twitter handle", hidden=True)
return
if not isinstance(channel, GuildText):
await ctx.send("Channel must be a text channel", hidden=True)
return
try:
latest_tweet = await asyncio.to_thread(self.api.user_timeline, screen_name=handle)[0]
except Exception:
await ctx.send(
"Unable to get user timeline. Are you sure the handle is correct?", hidden=True
)
return
count = Twitter.objects(guild=ctx.guild.id).count()
if count >= 12:
await ctx.send("Cannot follow more than 12 Twitter accounts", hidden=True)
return
exists = Twitter.objects(handle=handle, guild=ctx.guild.id)
if exists:
await ctx.send("Twitter handle already being followed in this guild", hidden=True)
return
t = Twitter(
handle=handle,
guild=ctx.guild.id,
channel=channel.id,
admin=ctx.author.id,
last_tweet=latest_tweet.id,
retweets=retweets,
)
t.save()
await ctx.send(f"Now following `@{handle}` in {channel.mention}")
@slash_command(name="twitter", sub_cmd_name="unfollow", description="Unfollow Twitter accounts")
@admin_or_permissions(Permissions.MANAGE_GUILD)
async def _twitter_unfollow(self, ctx: InteractionContext) -> None:
twitters = Twitter.objects(guild=ctx.guild.id)
if not twitters:
await ctx.send("You need to follow a Twitter account first", hidden=True)
return
options = []
handlemap = {str(x.id): x.handle for x in twitters}
for twitter in twitters:
option = SelectOption(label=twitter.handle, value=str(twitter.id))
options.append(option)
select = Select(
options=options, custom_id="to_delete", min_values=1, max_values=len(twitters)
)
components = [ActionRow(select)]
block = "\n".join(x.handle for x in twitters)
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
"Please choose accounts to unfollow",
components=components,
)
try:
context = await self.bot.wait_for_component(
check=lambda x: ctx.author.id == x.author.id,
messages=message,
timeout=60 * 5,
)
for to_delete in context.context.values:
_ = Twitter.objects(guild=ctx.guild.id, id=ObjectId(to_delete)).delete()
for row in components:
for component in row["components"]:
component["disabled"] = True
block = "\n".join(handlemap[x] for x in context.context.values)
await context.context.edit_origin(
content=f"Unfollowed the following:\n```\n{block}\n```", components=components
)
except asyncio.TimeoutError:
for row in components:
for component in row["components"]:
component["disabled"] = True
await message.edit(components=components)
@slash_command(
name="twitter", sub_cmd_name="retweets", description="Modify followed Twitter accounts"
)
@slash_option(
name="retweets",
description="Mirror re-tweets?",
option_type=OptionTypes.STRING,
required=False,
choices=[
SlashCommandChoice(name="Yes", value="Yes"),
SlashCommandChoice(name="No", value="No"),
],
)
@admin_or_permissions(Permissions.MANAGE_GUILD)
async def _twitter_modify(self, ctx: InteractionContext, retweets: str) -> None:
retweets = retweets == "Yes"
twitters = Twitter.objects(guild=ctx.guild.id)
if not twitters:
await ctx.send("You need to follow a Twitter account first", hidden=True)
return
options = []
for twitter in twitters:
option = SelectOption(label=twitter.handle, value=str(twitter.id))
options.append(option)
select = Select(
options=options, custom_id="to_update", min_values=1, max_values=len(twitters)
)
components = [ActionRow(select)]
block = "\n".join(x.handle for x in twitters)
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
f"Please choose which accounts to {'un' if not retweets else ''}follow retweets from",
components=components,
)
try:
context = await self.bot.wait_for_component(
check=lambda x: ctx.author.id == x.author.id,
messages=message,
timeout=60 * 5,
)
handlemap = {str(x.id): x.handle for x in twitters}
for to_update in context.context.values:
t = Twitter.objects(guild=ctx.guild.id, id=ObjectId(to_update)).first()
t.retweets = retweets
t.save()
for row in components:
for component in row["components"]:
component["disabled"] = True
block = "\n".join(handlemap[x] for x in context.context.values)
await context.context.edit_origin(
content=(
f"{'Unfollowed' if not retweets else 'Followed'} "
"retweets from the following:"
f"\n```\n{block}\n```"
),
components=components,
)
except asyncio.TimeoutError:
for row in components:
for component in row["components"]:
component["disabled"] = True
await message.edit(components=components)
def setup(bot: Snake) -> None:
"""Add TwitterCog to J.A.R.V.I.S."""
bot.add_cog(TwitterCog(bot))