"""J.A.R.V.I.S. Twitter Cog.""" import asyncio import logging import tweepy from bson import ObjectId from discord import TextChannel from discord.ext import commands from discord.ext.tasks import loop from discord.utils import find from discord_slash import SlashContext, cog_ext from discord_slash.model import SlashCommandOptionType as COptionType from discord_slash.utils.manage_commands import create_choice, create_option from discord_slash.utils.manage_components import ( create_actionrow, create_select, create_select_option, wait_for_component, ) from jarvis.config import get_config from jarvis.db.models import Twitter from jarvis.utils.permissions import admin_or_permissions logger = logging.getLogger("discord") class TwitterCog(commands.Cog): """J.A.R.V.I.S. Twitter Cog.""" def __init__(self, bot: commands.Bot): self.bot = bot config = get_config() auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"]) self.api = tweepy.API(auth) self._tweets.start() self._guild_cache = {} self._channel_cache = {} @loop(seconds=30) async def _tweets(self) -> None: twitters = Twitter.objects(active=True) handles = Twitter.objects.distinct("handle") twitter_data = {} for handle in handles: twitter_data[handle] = self.api.user_timeline(screen_name=handle) for twitter in twitters: try: tweets = list(filter(lambda x: x.id > twitter.last_tweet, twitter_data[twitter.handle])) if tweets: tweets = sorted(tweets, key=lambda x: x.id) if twitter.guild not in self._guild_cache: self._guild_cache[twitter.guild] = await self.bot.fetch_guild(twitter.guild) guild = self._guild_cache[twitter.guild] if twitter.channel not in self._channel_cache: channels = await guild.fetch_channels() self._channel_cache[twitter.channel] = find(lambda x: x.id == twitter.channel, channels) channel = self._channel_cache[twitter.channel] for tweet in tweets: retweet = "retweeted_status" in tweet.__dict__ if retweet and not twitter.retweets: continue timestamp = int(tweet.created_at.timestamp()) url = f"https://twitter.com/{twitter.handle}/status/{tweet.id}" verb = "re" if retweet else "" await channel.send(f"`@{twitter.handle}` {verb}tweeted this at : {url}") newest = max(tweets, key=lambda x: x.id) twitter.last_tweet = newest.id twitter.save() except Exception as e: logger.error(f"Error with tweets: {e}") @cog_ext.cog_subcommand( base="twitter", base_description="Twitter commands", name="follow", description="Follow a Twitter account", options=[ create_option(name="handle", description="Twitter account", option_type=COptionType.STRING, required=True), create_option( name="channel", description="Channel to post tweets into", option_type=COptionType.CHANNEL, required=True, ), create_option( name="retweets", description="Mirror re-tweets?", option_type=COptionType.STRING, required=False, choices=[create_choice(name="Yes", value="Yes"), create_choice(name="No", value="No")], ), ], ) @admin_or_permissions(manage_guild=True) async def _twitter_follow( self, ctx: SlashContext, handle: str, channel: TextChannel, retweets: str = "Yes" ) -> None: retweets = retweets == "Yes" if len(handle) > 15: await ctx.send("Invalid Twitter handle", hidden=True) return if not isinstance(channel, TextChannel): await ctx.send("Channel must be a text channel", hidden=True) return try: latest_tweet = self.api.user_timeline(screen_name=handle, count=1)[0] except Exception: await ctx.send("Unable to get user timeline. Are you sure the handle is correct?", hidden=True) return count = Twitter.objects(guild=ctx.guild.id).count() if count >= 12: await ctx.send("Cannot follow more than 12 Twitter accounts", hidden=True) return exists = Twitter.objects(handle=handle, guild=ctx.guild.id) if exists: await ctx.send("Twitter handle already being followed in this guild", hidden=True) return t = Twitter( handle=handle, guild=ctx.guild.id, channel=channel.id, admin=ctx.author.id, last_tweet=latest_tweet.id, retweets=retweets, ) t.save() await ctx.send(f"Now following `@{handle}` in {channel.mention}") @cog_ext.cog_subcommand( base="twitter", name="unfollow", description="Unfollow Twitter accounts", ) @admin_or_permissions(manage_guild=True) async def _twitter_unfollow(self, ctx: SlashContext) -> None: twitters = Twitter.objects(guild=ctx.guild.id) if not twitters: await ctx.send("You need to follow a Twitter account first", hidden=True) return options = [] handlemap = {x.id: x.handle for x in twitters} for twitter in twitters: option = create_select_option(label=twitter.handle, value=str(twitter.id)) options.append(option) select = create_select(options=options, custom_id="to_delete", min_values=1, max_values=len(twitters)) components = [create_actionrow(select)] block = "\n".join(x.handle for x in twitters) message = await ctx.send( content=f"You are following the following accounts:\n```\n{block}\n```\n\n" "Please choose accounts to unfollow", components=components, ) try: context = await wait_for_component( self.bot, check=lambda x: ctx.author.id == x.author.id, messages=message, timeout=60 * 5 ) for to_delete in context.selected_options: _ = Twitter.objects(guild=ctx.guild.id, id=ObjectId(to_delete)).delete() for row in components: for component in row["components"]: component["disabled"] = True block = "\n".join(handlemap[x] for x in context.selected_options) await context.edit_origin(content=f"Unfollowed the following:\n```\n{block}\n```", components=components) except asyncio.TimeoutError: for row in components: for component in row["components"]: component["disabled"] = True await message.edit(components=components) @cog_ext.cog_subcommand( base="twitter", name="retweets", description="Modify followed Twitter accounts", options=[ create_option( name="retweets", description="Mirror re-tweets?", option_type=COptionType.STRING, required=True, choices=[create_choice(name="Yes", value="Yes"), create_choice(name="No", value="No")], ), ], ) @admin_or_permissions(manage_guild=True) async def _twitter_modify(self, ctx: SlashContext, retweets: str) -> None: retweets = retweets == "Yes" twitters = Twitter.objects(guild=ctx.guild.id) if not twitters: await ctx.send("You need to follow a Twitter account first", hidden=True) return options = [] for twitter in twitters: option = create_select_option(label=twitter.handle, value=str(twitter.id)) options.append(option) select = create_select(options=options, custom_id="to_update", min_values=1, max_values=len(twitters)) components = [create_actionrow(select)] block = "\n".join(x.handle for x in twitters) message = await ctx.send( content=f"You are following the following accounts:\n```\n{block}\n```\n\n" f"Please choose which accounts to {'un' if not retweets else ''}follow retweets from", components=components, ) try: context = await wait_for_component( self.bot, check=lambda x: ctx.author.id == x.author.id, messages=message, timeout=60 * 5 ) handlemap = {x.id: x.handle for x in twitters} for to_update in context.selected_options: t = Twitter.objects(guild=ctx.guild.id, id=ObjectId()) t.retweets = retweets t.save() for row in components: for component in row["components"]: component["disabled"] = True block = "\n".join(handlemap[x] for x in context.selected_options) await context.edit_origin( content=f"{'Unfollowed' if not retweets else 'Followed'} retweets from the following:" f"\n```\n{block}\n```", components=components, ) except asyncio.TimeoutError: for row in components: for component in row["components"]: component["disabled"] = True await message.edit(components=components) def setup(bot: commands.Bot) -> None: """Add TwitterCog to J.A.R.V.I.S.""" bot.add_cog(TwitterCog(bot))