246 lines
9 KiB
Python
246 lines
9 KiB
Python
"""JARVIS Twitter Cog."""
|
|
import asyncio
|
|
import logging
|
|
|
|
import tweepy
|
|
from interactions import Client, Extension, InteractionContext, Permissions
|
|
from interactions.client.utils.misc_utils import get
|
|
from interactions.models.discord.channel import GuildText
|
|
from interactions.models.discord.components import (
|
|
ActionRow,
|
|
StringSelectMenu,
|
|
StringSelectOption,
|
|
)
|
|
from interactions.models.internal.application_commands import (
|
|
OptionType,
|
|
SlashCommand,
|
|
slash_option,
|
|
)
|
|
from interactions.models.internal.command import check
|
|
from jarvis_core.db.models import TwitterAccount, TwitterFollow
|
|
|
|
from jarvis.config import load_config
|
|
from jarvis.utils.permissions import admin_or_permissions
|
|
|
|
|
|
class TwitterCog(Extension):
|
|
"""JARVIS Twitter Cog."""
|
|
|
|
def __init__(self, bot: Client):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
config = load_config()
|
|
auth = tweepy.AppAuthHandler(config.twitter.consumer_key, config.twitter.consumer_secret)
|
|
self.api = tweepy.API(auth)
|
|
self._guild_cache = {}
|
|
self._channel_cache = {}
|
|
|
|
twitter = SlashCommand(
|
|
name="twitter",
|
|
description="Manage Twitter follows",
|
|
)
|
|
|
|
@twitter.subcommand(
|
|
sub_cmd_name="follow",
|
|
sub_cmd_description="Follow a Twitter acount",
|
|
)
|
|
@slash_option(name="handle", description="Twitter account", opt_type=OptionType.STRING, required=True)
|
|
@slash_option(
|
|
name="channel",
|
|
description="Channel to post tweets to",
|
|
opt_type=OptionType.CHANNEL,
|
|
required=True,
|
|
)
|
|
@slash_option(
|
|
name="retweets",
|
|
description="Mirror re-tweets?",
|
|
opt_type=OptionType.BOOLEAN,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _twitter_follow(
|
|
self, ctx: InteractionContext, handle: str, channel: GuildText, retweets: bool = True
|
|
) -> None:
|
|
handle = handle.lower()
|
|
if len(handle) > 15 or len(handle) < 4:
|
|
await ctx.send("Invalid Twitter handle", ephemeral=True)
|
|
return
|
|
|
|
if not isinstance(channel, GuildText):
|
|
await ctx.send("Channel must be a text channel", ephemeral=True)
|
|
return
|
|
|
|
try:
|
|
account = await asyncio.to_thread(self.api.get_user, screen_name=handle)
|
|
latest_tweet = (await asyncio.to_thread(self.api.user_timeline, screen_name=handle))[0]
|
|
except Exception:
|
|
await ctx.send("Unable to get user timeline. Are you sure the handle is correct?", ephemeral=True)
|
|
return
|
|
|
|
exists = await TwitterFollow.find_one(
|
|
TwitterFollow.twitter_id == account.id, TwitterFollow.guild == ctx.guild.id
|
|
)
|
|
if exists:
|
|
await ctx.send("Twitter account already being followed in this guild", ephemeral=True)
|
|
return
|
|
|
|
count = await TwitterFollow.find(TwitterFollow.guild == ctx.guild.id).count()
|
|
if count >= 12:
|
|
await ctx.send("Cannot follow more than 12 Twitter accounts", ephemeral=True)
|
|
return
|
|
|
|
ta = await TwitterAccount.find_one(TwitterAccount.twitter_id == account.id)
|
|
if not ta:
|
|
ta = TwitterAccount(
|
|
handle=account.screen_name,
|
|
twitter_id=account.id,
|
|
last_tweet=latest_tweet.id,
|
|
)
|
|
await ta.save()
|
|
|
|
tf = TwitterFollow(
|
|
twitter_id=account.id,
|
|
guild=ctx.guild.id,
|
|
channel=channel.id,
|
|
admin=ctx.author.id,
|
|
retweets=retweets,
|
|
)
|
|
|
|
await tf.save()
|
|
|
|
await ctx.send(f"Now following `@{handle}` in {channel.mention}")
|
|
|
|
@twitter.subcommand(sub_cmd_name="unfollow", sub_cmd_description="Unfollow Twitter accounts")
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _twitter_unfollow(self, ctx: InteractionContext) -> None:
|
|
t = TwitterFollow.find(TwitterFollow.guild == ctx.guild.id)
|
|
twitters = []
|
|
async for twitter in t:
|
|
twitters.append(twitter)
|
|
if not twitters:
|
|
await ctx.send("You need to follow a Twitter account first", ephemeral=True)
|
|
return
|
|
|
|
options = []
|
|
handlemap = {}
|
|
for twitter in twitters:
|
|
account = await TwitterAccount.find_one(TwitterAccount.twitter_id == twitter.twitter_id)
|
|
handlemap[str(twitter.twitter_id)] = account.handle
|
|
option = StringSelectOption(label=account.handle, value=str(twitter.twitter_id))
|
|
options.append(option)
|
|
|
|
select = StringSelectMenu(options=options, custom_id="to_delete", min_values=1, max_values=len(twitters))
|
|
|
|
components = [ActionRow(select)]
|
|
block = "\n".join(x for x in handlemap.values())
|
|
message = await ctx.send(
|
|
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
|
|
"Please choose accounts to unfollow",
|
|
components=components,
|
|
)
|
|
|
|
try:
|
|
context = await self.bot.wait_for_component(
|
|
check=lambda x: ctx.author.id == x.ctx.author.id,
|
|
messages=message,
|
|
timeout=60 * 5,
|
|
)
|
|
for to_delete in context.ctx.values:
|
|
follow = get(twitters, guild=ctx.guild.id, twitter_id=int(to_delete))
|
|
try:
|
|
await follow.delete()
|
|
except Exception:
|
|
self.logger.debug("Ignoring deletion error")
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
|
|
block = "\n".join(handlemap[x] for x in context.ctx.values)
|
|
await context.ctx.edit_origin(
|
|
content=f"Unfollowed the following:\n```\n{block}\n```", components=components
|
|
)
|
|
except asyncio.TimeoutError:
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
await message.edit(components=components)
|
|
|
|
@twitter.subcommand(
|
|
sub_cmd_name="retweets",
|
|
sub_cmd_description="Modify followed Twitter accounts",
|
|
)
|
|
@slash_option(
|
|
name="retweets",
|
|
description="Mirror re-tweets?",
|
|
opt_type=OptionType.BOOLEAN,
|
|
required=False,
|
|
)
|
|
@check(admin_or_permissions(Permissions.MANAGE_GUILD))
|
|
async def _twitter_modify(self, ctx: InteractionContext, retweets: bool = True) -> None:
|
|
t = TwitterFollow.find(TwitterFollow.guild == ctx.guild.id)
|
|
twitters = []
|
|
async for twitter in t:
|
|
twitters.append(twitter)
|
|
if not twitters:
|
|
await ctx.send("You need to follow a Twitter account first", ephemeral=True)
|
|
return
|
|
|
|
options = []
|
|
handlemap = {}
|
|
for twitter in twitters:
|
|
account = await TwitterAccount.find_one(TwitterAccount.twitter_id == twitter.id)
|
|
handlemap[str(twitter.twitter_id)] = account.handle
|
|
option = StringSelectOption(label=account.handle, value=str(twitter.twitter_id))
|
|
options.append(option)
|
|
|
|
select = StringSelectMenu(options=options, custom_id="to_update", min_values=1, max_values=len(twitters))
|
|
|
|
components = [ActionRow(select)]
|
|
block = "\n".join(x for x in handlemap.values())
|
|
message = await ctx.send(
|
|
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
|
|
f"Please choose which accounts to {'un' if not retweets else ''}follow retweets from",
|
|
components=components,
|
|
)
|
|
|
|
try:
|
|
context = await self.bot.wait_for_component(
|
|
check=lambda x: ctx.author.id == x.ctx.author.id,
|
|
messages=message,
|
|
timeout=60 * 5,
|
|
)
|
|
|
|
handlemap = {}
|
|
for to_update in context.ctx.values:
|
|
account = await TwitterAccount.find_one(TwitterAccount.twitter_id == int(to_update))
|
|
handlemap[str(twitter.twitter_id)] = account.handle
|
|
t = get(twitters, guild=ctx.guild.id, twitter_id=int(to_update))
|
|
t.retweets = True
|
|
await t.save()
|
|
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
|
|
block = "\n".join(handlemap[x] for x in context.ctx.values)
|
|
await context.ctx.edit_origin(
|
|
content=(
|
|
f"{'Unfollowed' if not retweets else 'Followed'} "
|
|
"retweets from the following:"
|
|
f"\n```\n{block}\n```"
|
|
),
|
|
components=components,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
for row in components:
|
|
for component in row.components:
|
|
component.disabled = True
|
|
await message.edit(components=components)
|
|
|
|
|
|
def setup(bot: Client) -> None:
|
|
"""Add TwitterCog to JARVIS"""
|
|
if load_config().twitter:
|
|
TwitterCog(bot)
|
|
else:
|
|
bot.logger.info("No Twitter configuration, not loading")
|