jarvis-bot/jarvis/cogs/twitter.py
2021-11-01 18:50:42 -06:00

246 lines
9.7 KiB
Python

"""J.A.R.V.I.S. Twitter Cog."""
import asyncio
import logging
import tweepy
from bson import ObjectId
from discord import TextChannel
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find
from discord_slash import SlashContext, cog_ext
from discord_slash.model import SlashCommandOptionType as COptionType
from discord_slash.utils.manage_commands import create_choice, create_option
from discord_slash.utils.manage_components import (
create_actionrow,
create_select,
create_select_option,
wait_for_component,
)
from jarvis.config import get_config
from jarvis.db.models import Twitter
from jarvis.utils.permissions import admin_or_permissions
logger = logging.getLogger("discord")
class TwitterCog(commands.Cog):
"""J.A.R.V.I.S. Twitter Cog."""
def __init__(self, bot: commands.Bot):
self.bot = bot
config = get_config()
auth = tweepy.AppAuthHandler(config.twitter["consumer_key"], config.twitter["consumer_secret"])
self.api = tweepy.API(auth)
self._tweets.start()
self._guild_cache = {}
self._channel_cache = {}
@loop(seconds=30)
async def _tweets(self) -> None:
twitters = Twitter.objects(active=True)
handles = Twitter.objects.distinct("handle")
twitter_data = {}
for handle in handles:
twitter_data[handle] = self.api.user_timeline(screen_name=handle)
for twitter in twitters:
try:
tweets = list(filter(lambda x: x.id > twitter.last_tweet, twitter_data[twitter.handle]))
if tweets:
tweets = sorted(tweets, key=lambda x: x.id)
if twitter.guild not in self._guild_cache:
self._guild_cache[twitter.guild] = await self.bot.fetch_guild(twitter.guild)
guild = self._guild_cache[twitter.guild]
if twitter.channel not in self._channel_cache:
channels = await guild.fetch_channels()
self._channel_cache[twitter.channel] = find(lambda x: x.id == twitter.channel, channels)
channel = self._channel_cache[twitter.channel]
for tweet in tweets:
retweet = "retweeted_status" in tweet.__dict__
if retweet and not twitter.retweets:
continue
timestamp = int(tweet.created_at.timestamp())
url = f"https://twitter.com/{twitter.handle}/status/{tweet.id}"
verb = "re" if retweet else ""
await channel.send(f"`@{twitter.handle}` {verb}tweeted this at <t:{timestamp}:f>: {url}")
newest = max(tweets, key=lambda x: x.id)
twitter.last_tweet = newest.id
twitter.save()
except Exception as e:
logger.error(f"Error with tweets: {e}")
@cog_ext.cog_subcommand(
base="twitter",
base_description="Twitter commands",
name="follow",
description="Follow a Twitter account",
options=[
create_option(name="handle", description="Twitter account", option_type=COptionType.STRING, required=True),
create_option(
name="channel",
description="Channel to post tweets into",
option_type=COptionType.CHANNEL,
required=True,
),
create_option(
name="retweets",
description="Mirror re-tweets?",
option_type=COptionType.STRING,
required=False,
choices=[create_choice(name="Yes", value="Yes"), create_choice(name="No", value="No")],
),
],
)
@admin_or_permissions(manage_guild=True)
async def _twitter_follow(
self, ctx: SlashContext, handle: str, channel: TextChannel, retweets: str = "Yes"
) -> None:
retweets = retweets == "Yes"
if len(handle) > 15:
await ctx.send("Invalid Twitter handle", hidden=True)
return
if not isinstance(channel, TextChannel):
await ctx.send("Channel must be a text channel", hidden=True)
return
try:
latest_tweet = self.api.user_timeline(screen_name=handle, count=1)[0]
except Exception:
await ctx.send("Unable to get user timeline. Are you sure the handle is correct?", hidden=True)
return
count = Twitter.objects(guild=ctx.guild.id).count()
if count >= 12:
await ctx.send("Cannot follow more than 12 Twitter accounts", hidden=True)
return
exists = Twitter.objects(handle=handle, guild=ctx.guild.id)
if exists:
await ctx.send("Twitter handle already being followed in this guild", hidden=True)
return
t = Twitter(
handle=handle,
guild=ctx.guild.id,
channel=channel.id,
admin=ctx.author.id,
last_tweet=latest_tweet.id,
retweets=retweets,
)
t.save()
await ctx.send(f"Now following `@{handle}` in {channel.mention}")
@cog_ext.cog_subcommand(
base="twitter",
name="unfollow",
description="Unfollow Twitter accounts",
)
@admin_or_permissions(manage_guild=True)
async def _twitter_unfollow(self, ctx: SlashContext) -> None:
twitters = Twitter.objects(guild=ctx.guild.id)
if not twitters:
await ctx.send("You need to follow a Twitter account first", hidden=True)
return
options = []
handlemap = {x.id: x.handle for x in twitters}
for twitter in twitters:
option = create_select_option(label=twitter.handle, value=str(twitter.id))
options.append(option)
select = create_select(options=options, custom_id="to_delete", min_values=1, max_values=len(twitters))
components = [create_actionrow(select)]
block = "\n".join(x.handle for x in twitters)
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
"Please choose accounts to unfollow",
components=components,
)
try:
context = await wait_for_component(
self.bot, check=lambda x: ctx.author.id == x.author.id, messages=message, timeout=60 * 5
)
for to_delete in context.selected_options:
_ = Twitter.objects(guild=ctx.guild.id, id=ObjectId(to_delete)).delete()
for row in components:
for component in row["components"]:
component["disabled"] = True
block = "\n".join(handlemap[x] for x in context.selected_options)
await context.edit_origin(content=f"Unfollowed the following:\n```\n{block}\n```", components=components)
except asyncio.TimeoutError:
for row in components:
for component in row["components"]:
component["disabled"] = True
await message.edit(components=components)
@cog_ext.cog_subcommand(
base="twitter",
name="retweets",
description="Modify followed Twitter accounts",
options=[
create_option(
name="retweets",
description="Mirror re-tweets?",
option_type=COptionType.STRING,
required=True,
choices=[create_choice(name="Yes", value="Yes"), create_choice(name="No", value="No")],
),
],
)
@admin_or_permissions(manage_guild=True)
async def _twitter_modify(self, ctx: SlashContext, retweets: str) -> None:
retweets = retweets == "Yes"
twitters = Twitter.objects(guild=ctx.guild.id)
if not twitters:
await ctx.send("You need to follow a Twitter account first", hidden=True)
return
options = []
for twitter in twitters:
option = create_select_option(label=twitter.handle, value=str(twitter.id))
options.append(option)
select = create_select(options=options, custom_id="to_update", min_values=1, max_values=len(twitters))
components = [create_actionrow(select)]
block = "\n".join(x.handle for x in twitters)
message = await ctx.send(
content=f"You are following the following accounts:\n```\n{block}\n```\n\n"
f"Please choose which accounts to {'un' if not retweets else ''}follow retweets from",
components=components,
)
try:
context = await wait_for_component(
self.bot, check=lambda x: ctx.author.id == x.author.id, messages=message, timeout=60 * 5
)
handlemap = {x.id: x.handle for x in twitters}
for to_update in context.selected_options:
t = Twitter.objects(guild=ctx.guild.id, id=ObjectId()).first()
t.retweets = retweets
t.save()
for row in components:
for component in row["components"]:
component["disabled"] = True
block = "\n".join(handlemap[x] for x in context.selected_options)
await context.edit_origin(
content=f"{'Unfollowed' if not retweets else 'Followed'} retweets from the following:"
f"\n```\n{block}\n```",
components=components,
)
except asyncio.TimeoutError:
for row in components:
for component in row["components"]:
component["disabled"] = True
await message.edit(components=components)
def setup(bot: commands.Bot) -> None:
"""Add TwitterCog to J.A.R.V.I.S."""
bot.add_cog(TwitterCog(bot))