jarvis-bot/jarvis/cogs/gitlab.py

467 lines
16 KiB
Python

"""J.A.R.V.I.S. GitLab Cog."""
from datetime import datetime, timedelta
import gitlab
from dis_snek import InteractionContext, Snake
from dis_snek.ext.paginators import Paginator
from dis_snek.models.discord.embed import Embed, EmbedField
from dis_snek.models.snek.application_commands import (
OptionTypes,
SlashCommandChoice,
slash_command,
slash_option,
)
from jarvis.config import get_config
from jarvis.utils import build_embed
from jarvis.utils.cachecog import CacheCog
guild_ids = [862402786116763668]
class GitlabCog(CacheCog):
"""J.A.R.V.I.S. GitLab Cog."""
def __init__(self, bot: Snake):
super().__init__(bot)
config = get_config()
self._gitlab = gitlab.Gitlab("https://git.zevaryx.com", private_token=config.gitlab_token)
# J.A.R.V.I.S. GitLab ID is 29
self.project = self._gitlab.projects.get(29)
@slash_command(
name="gl", sub_cmd_name="issue", description="Get an issue from GitLab", scopes=guild_ids
)
@slash_option(name="id", description="Issue ID", option_type=OptionTypes.INTEGER, required=True)
async def _issue(self, ctx: InteractionContext, id: int) -> None:
try:
issue = self.project.issues.get(int(id))
except gitlab.exceptions.GitlabGetError:
await ctx.send("Issue does not exist.", hidden=True)
return
assignee = issue.assignee
if assignee:
assignee = assignee["name"]
else:
assignee = "None"
created_at = datetime.strptime(issue.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
labels = issue.labels
if labels:
labels = "\n".join(issue.labels)
if not labels:
labels = "None"
fields = [
EmbedField(name="State", value=issue.state[0].upper() + issue.state[1:]),
EmbedField(name="Assignee", value=assignee),
EmbedField(name="Labels", value=labels),
]
color = self.project.labels.get(issue.labels[0]).color
fields.append(EmbedField(name="Created At", value=created_at))
if issue.state == "closed":
closed_at = datetime.strptime(issue.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
fields.append(EmbedField(name="Closed At", value=closed_at))
if issue.milestone:
fields.append(
EmbedField(
name="Milestone",
value=f"[{issue.milestone['title']}]({issue.milestone['web_url']})",
inline=False,
)
)
if len(issue.title) > 200:
issue.title = issue.title[:200] + "..."
embed = build_embed(
title=f"[#{issue.iid}] {issue.title}",
description=issue.description,
fields=fields,
color=color,
url=issue.web_url,
)
embed.set_author(
name=issue.author["name"],
icon_url=issue.author["avatar_url"],
url=issue.author["web_url"],
)
embed.set_thumbnail(
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
)
await ctx.send(embed=embed)
@slash_command(
name="gl",
sub_cmd_name="milestone",
description="Get a milestone from GitLab",
scopes=guild_ids,
)
@slash_option(
name="id", description="Milestone ID", option_type=OptionTypes.INTEGER, required=True
)
async def _milestone(self, ctx: InteractionContext, id: int) -> None:
try:
milestone = self.project.milestones.get(int(id))
except gitlab.exceptions.GitlabGetError:
await ctx.send("Milestone does not exist.", hidden=True)
return
created_at = datetime.strptime(milestone.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
fields = [
EmbedField(
name="State",
value=milestone.state[0].upper() + milestone.state[1:],
),
EmbedField(name="Start Date", value=milestone.start_date),
EmbedField(name="Due Date", value=milestone.due_date),
EmbedField(name="Created At", value=created_at),
]
if milestone.updated_at:
updated_at = datetime.strptime(milestone.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
fields.append(EmbedField(name="Updated At", value=updated_at))
if len(milestone.title) > 200:
milestone.title = milestone.title[:200] + "..."
embed = build_embed(
title=f"[#{milestone.iid}] {milestone.title}",
description=milestone.description,
fields=fields,
color="#00FFEE",
url=milestone.web_url,
)
embed.set_author(
name="J.A.R.V.I.S.",
url="https://git.zevaryx.com/jarvis",
icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png",
)
embed.set_thumbnail(
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
)
await ctx.send(embed=embed)
@slash_command(
name="gl",
sub_cmd_name="mr",
description="Get a merge request from GitLab",
scopes=guild_ids,
)
@slash_option(
name="id", description="Merge Request ID", option_type=OptionTypes.INTEGER, required=True
)
async def _mergerequest(self, ctx: InteractionContext, id: int) -> None:
try:
mr = self.project.mergerequests.get(int(id))
except gitlab.exceptions.GitlabGetError:
await ctx.send("Merge request does not exist.", hidden=True)
return
assignee = mr.assignee
if assignee:
assignee = assignee["name"]
else:
assignee = "None"
created_at = datetime.strptime(mr.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
labels = mr.labels
if labels:
labels = "\n".join(mr.labels)
if not labels:
labels = "None"
fields = [
EmbedField(name="State", value=mr.state[0].upper() + mr.state[1:]),
EmbedField(name="Assignee", value=assignee),
EmbedField(name="Labels", value=labels),
]
if mr.labels:
color = self.project.labels.get(mr.labels[0]).color
else:
color = "#00FFEE"
fields.append(EmbedField(name="Created At", value=created_at))
if mr.state == "merged":
merged_at = datetime.strptime(mr.merged_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
fields.append(EmbedField(name="Merged At", value=merged_at))
elif mr.state == "closed":
closed_at = datetime.strptime(mr.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
"%Y-%m-%d %H:%M:%S UTC"
)
fields.append(EmbedField(name="Closed At", value=closed_at))
if mr.milestone:
fields.append(
EmbedField(
name="Milestone",
value=f"[{mr.milestone['title']}]({mr.milestone['web_url']})",
inline=False,
)
)
if len(mr.title) > 200:
mr.title = mr.title[:200] + "..."
embed = build_embed(
title=f"[#{mr.iid}] {mr.title}",
description=mr.description,
fields=fields,
color=color,
url=mr.web_url,
)
embed.set_author(
name=mr.author["name"],
icon_url=mr.author["avatar_url"],
url=mr.author["web_url"],
)
embed.set_thumbnail(
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
)
await ctx.send(embed=embed)
def build_embed_page(self, api_list: list, t_state: str, name: str) -> Embed:
"""Build an embed page for the paginator."""
title = ""
if t_state:
title = f"{t_state} "
title += f"J.A.R.V.I.S. {name}s"
fields = []
for item in api_list:
fields.append(
EmbedField(
name=f"[#{item.iid}] {item.title}",
value=item.description + f"\n\n[View this {name}]({item.web_url})",
inline=False,
)
)
embed = build_embed(
title=title,
description="",
fields=fields,
url=f"https://git.zevaryx.com/stark-industries/j.a.r.v.i.s./{name.replace(' ', '_')}s",
)
embed.set_author(
name="J.A.R.V.I.S.",
url="https://git.zevaryx.com/jarvis",
icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png",
)
embed.set_thumbnail(
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
)
return embed
@slash_command(
name="gl", sub_cmd_name="issues", description="Get issues from GitLab", scopes=guild_ids
)
@slash_option(
name="state",
description="State of issues to get",
option_type=OptionTypes.STRING,
required=False,
options=[
SlashCommandChoice(name="Open", value="opened"),
SlashCommandChoice(name="Closed", value="closed"),
SlashCommandChoice(name="All", value="all"),
],
)
async def _issues(self, ctx: InteractionContext, state: str = "opened") -> None:
exists = self.check_cache(ctx, state=state)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: " + f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
await ctx.defer()
m_state = state
if m_state == "all":
m_state = None
issues = []
page = 1
try:
while curr_page := self.project.issues.list(
page=page,
state=m_state,
order_by="created_at",
sort="desc",
per_page=100,
):
issues += curr_page
page += 1
except gitlab.exceptions.GitlabGetError:
# Only send error on first page. Otherwise, use pages retrieved
if page == 1:
await ctx.send("Unable to get issues")
return
if len(issues) == 0:
await ctx.send("No issues match that criteria")
return
t_state = state
if t_state == "opened":
t_state = "open"
pages = []
t_state = t_state[0].upper() + t_state[1:]
for i in range(0, len(issues), 5):
pages.append(self.build_embed_page(issues[i : i + 5], t_state=t_state, name="issue"))
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"guild": ctx.guild.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"state": state,
"paginator": paginator,
}
await paginator.send(ctx)
@slash_command(
name="gl",
sub_cmd_name="mrs",
description="Get merge requests from GitLab",
scopes=guild_ids,
)
@slash_option(
name="state",
description="State of merge requests to get",
option_type=OptionTypes.STRING,
required=False,
options=[
SlashCommandChoice(name="Open", value="opened"),
SlashCommandChoice(name="Closed", value="closed"),
SlashCommandChoice(name="All", value="all"),
],
)
async def _mergerequests(self, ctx: InteractionContext, state: str = "opened") -> None:
exists = self.check_cache(ctx, state=state)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: " + f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
await ctx.defer()
m_state = state
if m_state == "all":
m_state = None
merges = []
page = 1
try:
while curr_page := self.project.mergerequests.list(
page=page,
state=m_state,
order_by="created_at",
sort="desc",
per_page=100,
):
merges += curr_page
page += 1
except gitlab.exceptions.GitlabGetError:
# Only send error on first page. Otherwise, use pages retrieved
if page == 1:
await ctx.send("Unable to get merge requests")
return
if len(merges) == 0:
await ctx.send("No merge requests match that criteria")
return
t_state = state
if t_state == "opened":
t_state = "open"
pages = []
t_state = t_state[0].upper() + t_state[1:]
for i in range(0, len(merges), 5):
pages.append(
self.build_embed_page(merges[i : i + 5], t_state=t_state, name="merge request")
)
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"guild": ctx.guild.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"state": state,
"paginator": paginator,
}
await paginator.send(ctx)
@slash_command(
name="gl",
sub_cmd_name="milestones",
description="Get milestones from GitLab",
scopes=guild_ids,
)
async def _milestones(self, ctx: InteractionContext) -> None:
exists = self.check_cache(ctx)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
f"Please use existing interaction: {exists['paginator']._message.jump_url}",
hidden=True,
)
return
await ctx.defer()
milestones = []
page = 1
try:
while curr_page := self.project.milestones.list(
page=page,
order_by="created_at",
sort="desc",
per_page=100,
):
milestones += curr_page
page += 1
except gitlab.exceptions.GitlabGetError:
# Only send error on first page. Otherwise, use pages retrieved
if page == 1:
await ctx.send("Unable to get milestones")
return
if len(milestones) == 0:
await ctx.send("No milestones exist")
return
pages = []
for i in range(0, len(milestones), 5):
pages.append(
self.build_embed_page(milestones[i : i + 5], t_state=None, name="milestone")
)
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"guild": ctx.guild.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"paginator": paginator,
}
await paginator.send(ctx)
def setup(bot: Snake) -> None:
"""Add GitlabCog to J.A.R.V.I.S. if Gitlab token exists."""
if get_config().gitlab_token:
bot.add_cog(GitlabCog(bot))