jarvis-bot/jarvis/cogs/twitter.py

249 lines
8.9 KiB
Python

"""JARVIS Twitter Cog."""
import asyncio
import logging
import tweepy
from jarvis_core.db import q
from jarvis_core.db.models import TwitterAccount, TwitterFollow
from naff import Client, Cog, InteractionContext, Permissions
from naff.client.utils.misc_utils import get
from naff.models.discord.channel import GuildText
from naff.models.discord.components import ActionRow, Select, SelectOption
from naff.models.naff.application_commands import (
OptionTypes,
SlashCommand,
slash_option,
)
from naff.models.naff.command import check
from jarvis.config import JarvisConfig
from jarvis.utils.permissions import admin_or_permissions
class TwitterCog(Cog):
"""JARVIS Twitter Cog."""
def __init__(self, bot: Client):
self.bot = bot
self.logger = logging.getLogger(__name__)
config = JarvisConfig.from_yaml()
auth = tweepy.AppAuthHandler(
config.twitter["consumer_key"], config.twitter["consumer_secret"]
)
self.api = tweepy.API(auth)
self._guild_cache = {}
self._channel_cache = {}
twitter = SlashCommand(
name="twitter",
description="Manage Twitter follows",
)
@twitter.subcommand(
sub_cmd_name="follow",
sub_cmd_description="Follow a Twitter acount",
)
@slash_option(
name="handle", description="Twitter account", opt_type=OptionTypes.STRING, required=True
)
@slash_option(
name="channel",
description="Channel to post tweets to",
opt_type=OptionTypes.CHANNEL,
required=True,
)
@slash_option(
name="retweets",
description="Mirror re-tweets?",
opt_type=OptionTypes.BOOLEAN,
required=False,
)
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _twitter_follow(
self, ctx: InteractionContext, handle: str, channel: GuildText, retweets: bool = True
) -> None:
handle = handle.lower()
if len(handle) > 15 or len(handle) < 4:
await ctx.send("Invalid Twitter handle", ephemeral=True)
return
if not isinstance(channel, GuildText):
await ctx.send("Channel must be a text channel", ephemeral=True)
return
try:
account = await asyncio.to_thread(self.api.get_user, screen_name=handle)
latest_tweet = (await asyncio.to_thread(self.api.user_timeline, screen_name=handle))[0]
except Exception:
await ctx.send(
"Unable to get user timeline. Are you sure the handle is correct?", ephemeral=True
)
return
exists = await TwitterFollow.find_one(q(twitter_id=account.id, guild=ctx.guild.id))
if exists:
await ctx.send("Twitter account already being followed in this guild", ephemeral=True)
return
count = len([i async for i in TwitterFollow.find(q(guild=ctx.guild.id))])
if count >= 12:
await ctx.send("Cannot follow more than 12 Twitter accounts", ephemeral=True)
return
ta = await TwitterAccount.find_one(q(twitter_id=account.id))
if not ta:
ta = TwitterAccount(
handle=account.screen_name,
twitter_id=account.id,
last_tweet=latest_tweet.id,
)
await ta.commit()
tf = TwitterFollow(
twitter_id=account.id,
guild=ctx.guild.id,
channel=channel.id,
admin=ctx.author.id,
retweets=retweets,
)
await tf.commit()
await ctx.send(f"Now following `@{handle}` in {channel.mention}")
@twitter.subcommand(sub_cmd_name="unfollow", sub_cmd_description="Unfollow Twitter accounts")
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _twitter_unfollow(self, ctx: InteractionContext) -> None:
t = TwitterFollow.find(q(guild=ctx.guild.id))
twitters = []
async for twitter in t:
twitters.append(twitter)
if not twitters:
await ctx.send("You need to follow a Twitter account first", ephemeral=True)
return
options = []
handlemap = {}
for twitter in twitters:
account = await TwitterAccount.find_one(q(twitter_id=twitter.twitter_id))
handlemap[str(twitter.twitter_id)] = account.handle
option = SelectOption(label=account.handle, value=str(twitter.twitter_id))
options.append(option)
select = Select(
options=options, custom_id="to_delete", min_values=1, max_values=len(twitters)
)
components = [ActionRow(select)]
block = "\n".join(x for x in handlemap.values())
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
"Please choose accounts to unfollow",
components=components,
)
try:
context = await self.bot.wait_for_component(
check=lambda x: ctx.author.id == x.context.author.id,
messages=message,
timeout=60 * 5,
)
for to_delete in context.context.values:
follow = get(twitters, guild=ctx.guild.id, twitter_id=int(to_delete))
try:
await follow.delete()
except Exception:
self.logger.debug("Ignoring deletion error")
for row in components:
for component in row.components:
component.disabled = True
block = "\n".join(handlemap[x] for x in context.context.values)
await context.context.edit_origin(
content=f"Unfollowed the following:\n```\n{block}\n```", components=components
)
except asyncio.TimeoutError:
for row in components:
for component in row.components:
component.disabled = True
await message.edit(components=components)
@twitter.subcommand(
sub_cmd_name="retweets",
sub_cmd_description="Modify followed Twitter accounts",
)
@slash_option(
name="retweets",
description="Mirror re-tweets?",
opt_type=OptionTypes.BOOLEAN,
required=False,
)
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
async def _twitter_modify(self, ctx: InteractionContext, retweets: bool = True) -> None:
t = TwitterFollow.find(q(guild=ctx.guild.id))
twitters = []
async for twitter in t:
twitters.append(twitter)
if not twitters:
await ctx.send("You need to follow a Twitter account first", ephemeral=True)
return
options = []
handlemap = {}
for twitter in twitters:
account = await TwitterAccount.find_one(q(twitter_id=twitter.id))
handlemap[str(twitter.twitter_id)] = account.handle
option = SelectOption(label=account.handle, value=str(twitter.twitter_id))
options.append(option)
select = Select(
options=options, custom_id="to_update", min_values=1, max_values=len(twitters)
)
components = [ActionRow(select)]
block = "\n".join(x for x in handlemap.values())
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
f"Please choose which accounts to {'un' if not retweets else ''}follow retweets from",
components=components,
)
try:
context = await self.bot.wait_for_component(
check=lambda x: ctx.author.id == x.author.id,
messages=message,
timeout=60 * 5,
)
handlemap = {}
for to_update in context.context.values:
account = await TwitterAccount.find_one(q(twitter_id=int(to_update)))
handlemap[str(twitter.twitter_id)] = account.handle
t = get(twitters, guild=ctx.guild.id, twitter_id=int(to_update))
t.update(q(retweets=True))
await t.commit()
for row in components:
for component in row.components:
component.disabled = True
block = "\n".join(handlemap[x] for x in context.context.values)
await context.context.edit_origin(
content=(
f"{'Unfollowed' if not retweets else 'Followed'} "
"retweets from the following:"
f"\n```\n{block}\n```"
),
components=components,
)
except asyncio.TimeoutError:
for row in components:
for component in row.components:
component.disabled = True
await message.edit(components=components)
def setup(bot: Client) -> None:
"""Add TwitterCog to JARVIS"""
if JarvisConfig.from_yaml().twitter:
TwitterCog(bot)