jarvis-bot/jarvis/cogs/twitter.py

147 lines
5.6 KiB
Python

"""J.A.R.V.I.S. Twitter Cog."""
import asyncio
import logging
import tweepy
from bson import ObjectId
from discord import TextChannel
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find
from discord_slash import SlashContext, cog_ext
from discord_slash.utils.manage_commands import create_option
from discord_slash.utils.manage_components import (
create_actionrow,
create_select,
create_select_option,
wait_for_component,
)
from jarvis.config import get_config
from jarvis.db.models import Twitter
from jarvis.utils.permissions import admin_or_permissions
logger = logging.getLogger("discord")
class TwitterCog(commands.Cog):
"""J.A.R.V.I.S. Twitter Cog."""
def __init__(self, bot: commands.Bot):
self.bot = bot
config = get_config()
auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"])
self.api = tweepy.API(auth)
self._tweets.start()
@loop(seconds=30)
async def _tweets(self) -> None:
twitters = Twitter.objects(active=True)
for twitter in twitters:
try:
tweets = self.api.user_timeline(screen_name=twitter.handle, since_id=twitter.last_tweet)
tweets = sorted(tweets, key=lambda x: x.id)
if tweets:
guild = await self.bot.fetch_guild(twitter.guild)
channels = await guild.fetch_channels()
channel = find(lambda x: x.id == twitter.channel, channels)
for tweet in tweets:
timestamp = int(tweet.created_at.timestamp())
url = f"https://twitter.com/{twitter.handle}/status/{tweet.id}"
await channel.send(f"`@{twitter.handle}` tweeted this at <t:{timestamp}:f>: {url}")
newest = max(tweets, key=lambda x: x.id)
twitter.last_tweet = newest.id
twitter.save()
except Exception as e:
logger.error(f"Error with tweets: {e}")
@cog_ext.cog_subcommand(
base="twitter",
base_description="Twitter commands",
name="follow",
description="Follow a Twitter account",
options=[
create_option(name="handle", description="Twitter account", option_type=3, required=True),
create_option(name="channel", description="Channel to post tweets into", option_type=7, required=True),
],
)
@admin_or_permissions(manage_guild=True)
async def _twitter_follow(self, ctx: SlashContext, handle: str, channel: TextChannel) -> None:
if len(handle) > 15:
await ctx.send("Invalid Twitter handle", hidden=True)
return
if not isinstance(channel, TextChannel):
await ctx.send("Channel must be a text channel", hidden=True)
return
try:
latest_tweet = self.api.user_timeline(screen_name=handle, count=1)[0]
except Exception:
await ctx.send("Unable to get user timeline. Are you sure the handle is correct?", hidden=True)
return
count = Twitter.objects(guild=ctx.guild.id).count()
if count >= 12:
await ctx.send("Cannot follow more than 12 Twitter accounts", hidden=True)
return
exists = Twitter.objects(handle=handle, guild=ctx.guild.id)
if exists:
await ctx.send("Twitter handle already being followed in this guild", hidden=True)
return
t = Twitter(
handle=handle, guild=ctx.guild.id, channel=channel.id, admin=ctx.author.id, last_tweet=latest_tweet.id
)
t.save()
await ctx.send(f"Now following `@{handle}` in {channel.mention}")
@cog_ext.cog_subcommand(
base="twitter",
name="unfollow",
description="Unfollow Twitter accounts",
)
@admin_or_permissions(manage_guild=True)
async def _twitter_unfollow(self, ctx: SlashContext) -> None:
twitters = Twitter.objects(guild=ctx.guild.id)
if not twitters:
await ctx.send("You need to follow a Twitter account first", hidden=True)
return
options = []
for twitter in twitters:
option = create_select_option(label=twitter.handle, value=str(twitter.id))
options.append(option)
select = create_select(options=options, custom_id="to_delete", min_values=1, max_values=len(twitters))
components = [create_actionrow(select)]
block = "\n".join(x.handle for x in twitters)
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```", components=components
)
try:
context = await wait_for_component(
self.bot, check=lambda x: ctx.author.id == x.author.id, messages=message, timeout=60 * 5
)
for to_delete in context.selected_options:
_ = Twitter.objects(guild=ctx.guild.id, id=ObjectId(to_delete)).delete()
for row in components:
for component in row["components"]:
component["disabled"] = True
block = "\n".join(x.handle for x in context.selected_options)
await context.edit_origin(content=f"Unfollowed the following:\n```\n{block}\n```", components=components)
except asyncio.TimeoutError:
for row in components:
for component in row["components"]:
component["disabled"] = True
await message.edit(components=components)
def setup(bot: commands.Bot) -> None:
"""Add TwitterCog to J.A.R.V.I.S."""
bot.add_cog(TwitterCog(bot))