469 lines
16 KiB
Python
469 lines
16 KiB
Python
"""J.A.R.V.I.S. GitLab Cog."""
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
import gitlab
|
|
from dis_snek import InteractionContext, Scale, Snake
|
|
from dis_snek.ext.paginators import Paginator
|
|
from dis_snek.models.discord.embed import Embed, EmbedField
|
|
from dis_snek.models.discord.modal import InputText, Modal, TextStyles
|
|
from dis_snek.models.discord.user import Member
|
|
from dis_snek.models.snek.application_commands import (
|
|
OptionTypes,
|
|
SlashCommand,
|
|
SlashCommandChoice,
|
|
slash_command,
|
|
slash_option,
|
|
)
|
|
from dis_snek.models.snek.command import cooldown
|
|
from dis_snek.models.snek.cooldowns import Buckets
|
|
|
|
from jarvis.config import JarvisConfig
|
|
from jarvis.utils import build_embed
|
|
|
|
guild_ids = [862402786116763668]
|
|
|
|
|
|
class GitlabCog(Scale):
|
|
"""J.A.R.V.I.S. GitLab Cog."""
|
|
|
|
def __init__(self, bot: Snake):
|
|
self.bot = bot
|
|
self.logger = logging.getLogger(__name__)
|
|
config = JarvisConfig.from_yaml()
|
|
self._gitlab = gitlab.Gitlab("https://git.zevaryx.com", private_token=config.gitlab_token)
|
|
# J.A.R.V.I.S. GitLab ID is 29
|
|
self.project = self._gitlab.projects.get(29)
|
|
|
|
gl = SlashCommand(name="gl", description="Get GitLab info", scopes=guild_ids)
|
|
|
|
@gl.subcommand(
|
|
sub_cmd_name="issue",
|
|
sub_cmd_description="Get an issue from GitLab",
|
|
)
|
|
@slash_option(name="id", description="Issue ID", opt_type=OptionTypes.INTEGER, required=True)
|
|
async def _issue(self, ctx: InteractionContext, id: int) -> None:
|
|
try:
|
|
issue = self.project.issues.get(int(id))
|
|
except gitlab.exceptions.GitlabGetError:
|
|
await ctx.send("Issue does not exist.", ephemeral=True)
|
|
return
|
|
assignee = issue.assignee
|
|
if assignee:
|
|
assignee = assignee["name"]
|
|
else:
|
|
assignee = "None"
|
|
|
|
created_at = datetime.strptime(issue.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
|
|
labels = issue.labels
|
|
if labels:
|
|
labels = "\n".join(issue.labels)
|
|
if not labels:
|
|
labels = "None"
|
|
|
|
fields = [
|
|
EmbedField(name="State", value=issue.state[0].upper() + issue.state[1:]),
|
|
EmbedField(name="Assignee", value=assignee),
|
|
EmbedField(name="Labels", value=labels),
|
|
]
|
|
color = self.project.labels.get(issue.labels[0]).color
|
|
fields.append(EmbedField(name="Created At", value=created_at))
|
|
if issue.state == "closed":
|
|
closed_at = datetime.strptime(issue.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
fields.append(EmbedField(name="Closed At", value=closed_at))
|
|
if issue.milestone:
|
|
fields.append(
|
|
EmbedField(
|
|
name="Milestone",
|
|
value=f"[{issue.milestone['title']}]({issue.milestone['web_url']})",
|
|
inline=False,
|
|
)
|
|
)
|
|
if len(issue.title) > 200:
|
|
issue.title = issue.title[:200] + "..."
|
|
embed = build_embed(
|
|
title=f"[#{issue.iid}] {issue.title}",
|
|
description=issue.description,
|
|
fields=fields,
|
|
color=color,
|
|
url=issue.web_url,
|
|
)
|
|
embed.set_author(
|
|
name=issue.author["name"],
|
|
icon_url=issue.author["avatar_url"],
|
|
url=issue.author["web_url"],
|
|
)
|
|
embed.set_thumbnail(
|
|
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
@gl.subcommand(
|
|
sub_cmd_name="milestone",
|
|
sub_cmd_description="Get a milestone from GitLab",
|
|
)
|
|
@slash_option(
|
|
name="id", description="Milestone ID", opt_type=OptionTypes.INTEGER, required=True
|
|
)
|
|
async def _milestone(self, ctx: InteractionContext, id: int) -> None:
|
|
try:
|
|
milestone = self.project.milestones.get(int(id))
|
|
except gitlab.exceptions.GitlabGetError:
|
|
await ctx.send("Milestone does not exist.", ephemeral=True)
|
|
return
|
|
|
|
created_at = datetime.strptime(milestone.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
|
|
fields = [
|
|
EmbedField(
|
|
name="State",
|
|
value=milestone.state[0].upper() + milestone.state[1:],
|
|
),
|
|
EmbedField(name="Start Date", value=milestone.start_date),
|
|
EmbedField(name="Due Date", value=milestone.due_date),
|
|
EmbedField(name="Created At", value=created_at),
|
|
]
|
|
|
|
if milestone.updated_at:
|
|
updated_at = datetime.strptime(milestone.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
fields.append(EmbedField(name="Updated At", value=updated_at))
|
|
|
|
if len(milestone.title) > 200:
|
|
milestone.title = milestone.title[:200] + "..."
|
|
|
|
embed = build_embed(
|
|
title=f"[#{milestone.iid}] {milestone.title}",
|
|
description=milestone.description,
|
|
fields=fields,
|
|
color="#00FFEE",
|
|
url=milestone.web_url,
|
|
)
|
|
embed.set_author(
|
|
name="J.A.R.V.I.S.",
|
|
url="https://git.zevaryx.com/jarvis",
|
|
icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png",
|
|
)
|
|
embed.set_thumbnail(
|
|
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
@gl.subcommand(
|
|
sub_cmd_name="mr",
|
|
sub_cmd_description="Get a merge request from GitLab",
|
|
)
|
|
@slash_option(
|
|
name="id", description="Merge Request ID", opt_type=OptionTypes.INTEGER, required=True
|
|
)
|
|
async def _mergerequest(self, ctx: InteractionContext, id: int) -> None:
|
|
try:
|
|
mr = self.project.mergerequests.get(int(id))
|
|
except gitlab.exceptions.GitlabGetError:
|
|
await ctx.send("Merge request does not exist.", ephemeral=True)
|
|
return
|
|
assignee = mr.assignee
|
|
if assignee:
|
|
assignee = assignee["name"]
|
|
else:
|
|
assignee = "None"
|
|
|
|
created_at = datetime.strptime(mr.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
|
|
labels = mr.labels
|
|
if labels:
|
|
labels = "\n".join(mr.labels)
|
|
if not labels:
|
|
labels = "None"
|
|
|
|
fields = [
|
|
EmbedField(name="State", value=mr.state[0].upper() + mr.state[1:], inline=True),
|
|
EmbedField(name="Draft?", value=str(mr.draft), inline=True),
|
|
EmbedField(name="Assignee", value=assignee, inline=True),
|
|
EmbedField(name="Labels", value=labels, inline=True),
|
|
]
|
|
if mr.labels:
|
|
color = self.project.labels.get(mr.labels[0]).color
|
|
else:
|
|
color = "#00FFEE"
|
|
fields.append(EmbedField(name="Created At", value=created_at, inline=True))
|
|
if mr.state == "merged":
|
|
merged_at = datetime.strptime(mr.merged_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
fields.append(EmbedField(name="Merged At", value=merged_at, inline=True))
|
|
elif mr.state == "closed":
|
|
closed_at = datetime.strptime(mr.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
fields.append(EmbedField(name="Closed At", value=closed_at, inline=True))
|
|
if mr.milestone:
|
|
fields.append(
|
|
EmbedField(
|
|
name="Milestone",
|
|
value=f"[{mr.milestone['title']}]({mr.milestone['web_url']})",
|
|
inline=False,
|
|
)
|
|
)
|
|
if len(mr.title) > 200:
|
|
mr.title = mr.title[:200] + "..."
|
|
embed = build_embed(
|
|
title=f"[#{mr.iid}] {mr.title}",
|
|
description=mr.description,
|
|
fields=fields,
|
|
color=color,
|
|
url=mr.web_url,
|
|
)
|
|
embed.set_author(
|
|
name=mr.author["name"],
|
|
icon_url=mr.author["avatar_url"],
|
|
url=mr.author["web_url"],
|
|
)
|
|
embed.set_thumbnail(
|
|
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
|
|
)
|
|
await ctx.send(embed=embed)
|
|
|
|
def build_embed_page(self, api_list: list, t_state: str, name: str) -> Embed:
|
|
"""Build an embed page for the paginator."""
|
|
title = ""
|
|
if t_state:
|
|
title = f"{t_state} "
|
|
title += f"J.A.R.V.I.S. {name}s"
|
|
fields = []
|
|
for item in api_list:
|
|
description = item.description or "No description"
|
|
fields.append(
|
|
EmbedField(
|
|
name=f"[#{item.iid}] {item.title}",
|
|
value=(description[:200] + f"...\n\n[View this {name}]({item.web_url})"),
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
embed = build_embed(
|
|
title=title,
|
|
description="",
|
|
fields=fields,
|
|
url=f"https://git.zevaryx.com/stark-industries/j.a.r.v.i.s./{name.replace(' ', '_')}s",
|
|
)
|
|
embed.set_author(
|
|
name="J.A.R.V.I.S.",
|
|
url="https://git.zevaryx.com/jarvis",
|
|
icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png",
|
|
)
|
|
embed.set_thumbnail(
|
|
url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png"
|
|
)
|
|
return embed
|
|
|
|
@gl.subcommand(
|
|
sub_cmd_name="issues",
|
|
sub_cmd_description="Get issues from GitLab",
|
|
)
|
|
@slash_option(
|
|
name="state",
|
|
description="State of issues to get",
|
|
opt_type=OptionTypes.STRING,
|
|
required=False,
|
|
choices=[
|
|
SlashCommandChoice(name="Open", value="opened"),
|
|
SlashCommandChoice(name="Closed", value="closed"),
|
|
SlashCommandChoice(name="All", value="all"),
|
|
],
|
|
)
|
|
async def _issues(self, ctx: InteractionContext, state: str = "opened") -> None:
|
|
await ctx.defer()
|
|
m_state = state
|
|
if m_state == "all":
|
|
m_state = None
|
|
issues = []
|
|
page = 1
|
|
try:
|
|
while curr_page := self.project.issues.list(
|
|
page=page,
|
|
state=m_state,
|
|
order_by="created_at",
|
|
sort="desc",
|
|
per_page=100,
|
|
):
|
|
issues += curr_page
|
|
page += 1
|
|
except gitlab.exceptions.GitlabGetError:
|
|
# Only send error on first page. Otherwise, use pages retrieved
|
|
if page == 1:
|
|
await ctx.send("Unable to get issues")
|
|
return
|
|
|
|
if len(issues) == 0:
|
|
await ctx.send("No issues match that criteria")
|
|
return
|
|
|
|
t_state = state
|
|
if t_state == "opened":
|
|
t_state = "open"
|
|
pages = []
|
|
t_state = t_state[0].upper() + t_state[1:]
|
|
for i in range(0, len(issues), 5):
|
|
pages.append(self.build_embed_page(issues[i : i + 5], t_state=t_state, name="issue"))
|
|
|
|
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
|
|
|
|
await paginator.send(ctx)
|
|
|
|
@gl.subcommand(
|
|
sub_cmd_name="mrs",
|
|
sub_cmd_description="Get merge requests from GitLab",
|
|
)
|
|
@slash_option(
|
|
name="state",
|
|
description="State of merge requests to get",
|
|
opt_type=OptionTypes.STRING,
|
|
required=False,
|
|
choices=[
|
|
SlashCommandChoice(name="Open", value="opened"),
|
|
SlashCommandChoice(name="Closed", value="closed"),
|
|
SlashCommandChoice(name="All", value="all"),
|
|
],
|
|
)
|
|
async def _mergerequests(self, ctx: InteractionContext, state: str = "opened") -> None:
|
|
await ctx.defer()
|
|
m_state = state
|
|
if m_state == "all":
|
|
m_state = None
|
|
merges = []
|
|
page = 1
|
|
try:
|
|
while curr_page := self.project.mergerequests.list(
|
|
page=page,
|
|
state=m_state,
|
|
order_by="created_at",
|
|
sort="desc",
|
|
per_page=100,
|
|
):
|
|
merges += curr_page
|
|
page += 1
|
|
except gitlab.exceptions.GitlabGetError:
|
|
# Only send error on first page. Otherwise, use pages retrieved
|
|
if page == 1:
|
|
await ctx.send("Unable to get merge requests")
|
|
return
|
|
|
|
if len(merges) == 0:
|
|
await ctx.send("No merge requests match that criteria")
|
|
return
|
|
|
|
t_state = state
|
|
if t_state == "opened":
|
|
t_state = "open"
|
|
pages = []
|
|
t_state = t_state[0].upper() + t_state[1:]
|
|
for i in range(0, len(merges), 5):
|
|
pages.append(
|
|
self.build_embed_page(merges[i : i + 5], t_state=t_state, name="merge request")
|
|
)
|
|
|
|
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
|
|
|
|
await paginator.send(ctx)
|
|
|
|
@gl.subcommand(
|
|
sub_cmd_name="milestones",
|
|
sub_cmd_description="Get milestones from GitLab",
|
|
)
|
|
async def _milestones(self, ctx: InteractionContext) -> None:
|
|
await ctx.defer()
|
|
milestones = []
|
|
page = 1
|
|
try:
|
|
while curr_page := self.project.milestones.list(
|
|
page=page,
|
|
order_by="created_at",
|
|
sort="desc",
|
|
per_page=100,
|
|
):
|
|
milestones += curr_page
|
|
page += 1
|
|
except gitlab.exceptions.GitlabGetError:
|
|
# Only send error on first page. Otherwise, use pages retrieved
|
|
if page == 1:
|
|
await ctx.send("Unable to get milestones")
|
|
return
|
|
|
|
if len(milestones) == 0:
|
|
await ctx.send("No milestones exist")
|
|
return
|
|
|
|
pages = []
|
|
for i in range(0, len(milestones), 5):
|
|
pages.append(
|
|
self.build_embed_page(milestones[i : i + 5], t_state=None, name="milestone")
|
|
)
|
|
|
|
paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300)
|
|
|
|
await paginator.send(ctx)
|
|
|
|
@slash_command(name="issue", description="Report an issue on GitLab", scopes=guild_ids)
|
|
@slash_option(
|
|
name="user",
|
|
description="Credit someone else for this issue",
|
|
opt_type=OptionTypes.USER,
|
|
required=False,
|
|
)
|
|
@cooldown(bucket=Buckets.USER, rate=1, interval=600)
|
|
async def _open_issue(self, ctx: InteractionContext, user: Member = None) -> None:
|
|
user = user or ctx.author
|
|
modal = Modal(
|
|
title="Open a new issue on GitLab",
|
|
components=[
|
|
InputText(
|
|
label="Issue Title",
|
|
placeholder="Descriptive Title",
|
|
style=TextStyles.SHORT,
|
|
custom_id="title",
|
|
max_length=200,
|
|
),
|
|
InputText(
|
|
label="Description (supports Markdown!)",
|
|
placeholder="Detailed Description",
|
|
style=TextStyles.PARAGRAPH,
|
|
custom_id="description",
|
|
),
|
|
],
|
|
)
|
|
await ctx.send_modal(modal)
|
|
try:
|
|
resp = await self.bot.wait_for_modal(modal, author=ctx.author.id, timeout=60 * 5)
|
|
title = resp.responses.get("title")
|
|
desc = resp.responses.get("description")
|
|
except asyncio.TimeoutError:
|
|
return
|
|
if not title.startswith("[Discord]"):
|
|
title = "[Discord] " + title
|
|
desc = f"Opened by `@{user.username}` on Discord\n\n" + desc
|
|
issue = self.project.issues.create(data={"title": title, "description": desc})
|
|
embed = build_embed(
|
|
title=f"Issue #{issue.id} Created",
|
|
description=("Thank you for opening an issue!\n\n[View it online]({issue['web_url']})"),
|
|
fields=[],
|
|
color="#00FFEE",
|
|
)
|
|
await resp.send(embed=embed)
|
|
|
|
|
|
def setup(bot: Snake) -> None:
|
|
"""Add GitlabCog to J.A.R.V.I.S. if Gitlab token exists."""
|
|
if JarvisConfig.from_yaml().gitlab_token:
|
|
GitlabCog(bot)
|