495 lines
17 KiB
Python
495 lines
17 KiB
Python
"""J.A.R.V.I.S. GitLab Cog."""
|
|
from datetime import datetime, timedelta
|
|
|
|
import gitlab
|
|
from ButtonPaginator import Paginator
|
|
from discord import Embed
|
|
from discord.ext import commands
|
|
from discord_slash import SlashContext, cog_ext
|
|
from discord_slash.model import ButtonStyle
|
|
from discord_slash.utils.manage_commands import create_choice, create_option
|
|
|
|
from jarvis.config import get_config
|
|
from jarvis.utils import build_embed
|
|
from jarvis.utils.cachecog import CacheCog
|
|
from jarvis.utils.field import Field
|
|
|
|
guild_ids = [862402786116763668]
|
|
|
|
|
|
class GitlabCog(CacheCog):
|
|
"""J.A.R.V.I.S. GitLab Cog."""
|
|
|
|
def __init__(self, bot: commands.Bot):
|
|
super().__init__(bot)
|
|
config = get_config()
|
|
self._gitlab = gitlab.Gitlab("https://git.zevaryx.com", private_token=config.gitlab_token)
|
|
# J.A.R.V.I.S. GitLab ID is 29
|
|
self.project = self._gitlab.projects.get(29)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="gl",
|
|
name="issue",
|
|
description="Get an issue from GitLab",
|
|
guild_ids=guild_ids,
|
|
options=[create_option(name="id", description="Issue ID", option_type=4, required=True)],
|
|
)
|
|
async def _issue(self, ctx: SlashContext, id: int) -> None:
|
|
try:
|
|
issue = self.project.issues.get(int(id))
|
|
except gitlab.exceptions.GitlabGetError:
|
|
await ctx.send("Issue does not exist.", hidden=True)
|
|
return
|
|
assignee = issue.assignee
|
|
if assignee:
|
|
assignee = assignee["name"]
|
|
else:
|
|
assignee = "None"
|
|
|
|
created_at = datetime.strptime(issue.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
labels = issue.labels
|
|
if labels:
|
|
labels = "\n".join(issue.labels)
|
|
if not labels:
|
|
labels = "None"
|
|
|
|
fields = [
|
|
Field(name="State", value=issue.state[0].upper() + issue.state[1:]),
|
|
Field(name="Assignee", value=assignee),
|
|
Field(name="Labels", value=labels),
|
|
]
|
|
color = self.project.labels.get(issue.labels[0]).color
|
|
fields.append(Field(name="Created At", value=created_at))
|
|
if issue.state == "closed":
|
|
closed_at = datetime.strptime(issue.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
fields.append(Field(name="Closed At", value=closed_at))
|
|
if issue.milestone:
|
|
fields.append(
|
|
Field(
|
|
name="Milestone",
|
|
value=f"[{issue.milestone['title']}]({issue.milestone['web_url']})",
|
|
inline=False,
|
|
)
|
|
)
|
|
if len(issue.title) > 200:
|
|
issue.title = issue.title[:200] + "..."
|
|
embed = build_embed(
|
|
title=f"[#{issue.iid}] {issue.title}",
|
|
description=issue.description,
|
|
fields=fields,
|
|
color=color,
|
|
url=issue.web_url,
|
|
)
|
|
embed.set_author(
|
|
name=issue.author["name"],
|
|
icon_url=issue.author["avatar_url"],
|
|
url=issue.author["web_url"],
|
|
)
|
|
embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png")
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="gl",
|
|
name="milestone",
|
|
description="Get a milestone from GitLab",
|
|
guild_ids=guild_ids,
|
|
options=[
|
|
create_option(
|
|
name="id",
|
|
description="Milestone ID",
|
|
option_type=4,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
async def _milestone(self, ctx: SlashContext, id: int) -> None:
|
|
try:
|
|
milestone = self.project.milestones.get(int(id))
|
|
except gitlab.exceptions.GitlabGetError:
|
|
await ctx.send("Milestone does not exist.", hidden=True)
|
|
return
|
|
|
|
created_at = datetime.strptime(milestone.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
fields = [
|
|
Field(
|
|
name="State",
|
|
value=milestone.state[0].upper() + milestone.state[1:],
|
|
),
|
|
Field(name="Start Date", value=milestone.start_date),
|
|
Field(name="Due Date", value=milestone.due_date),
|
|
Field(name="Created At", value=created_at),
|
|
]
|
|
|
|
if milestone.updated_at:
|
|
updated_at = datetime.strptime(milestone.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime(
|
|
"%Y-%m-%d %H:%M:%S UTC"
|
|
)
|
|
fields.append(Field(name="Updated At", value=updated_at))
|
|
|
|
if len(milestone.title) > 200:
|
|
milestone.title = milestone.title[:200] + "..."
|
|
|
|
embed = build_embed(
|
|
title=f"[#{milestone.iid}] {milestone.title}",
|
|
description=milestone.description,
|
|
fields=fields,
|
|
color="#00FFEE",
|
|
url=milestone.web_url,
|
|
)
|
|
embed.set_author(
|
|
name="J.A.R.V.I.S.",
|
|
url="https://git.zevaryx.com/jarvis",
|
|
icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png",
|
|
)
|
|
embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png")
|
|
await ctx.send(embed=embed)
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="gl",
|
|
name="mergerequest",
|
|
description="Get an merge request from GitLab",
|
|
guild_ids=guild_ids,
|
|
options=[
|
|
create_option(
|
|
name="id",
|
|
description="Merge Request ID",
|
|
option_type=4,
|
|
required=True,
|
|
)
|
|
],
|
|
)
|
|
async def _mergerequest(self, ctx: SlashContext, id: int) -> None:
|
|
try:
|
|
mr = self.project.mergerequests.get(int(id))
|
|
except gitlab.exceptions.GitlabGetError:
|
|
await ctx.send("Merge request does not exist.", hidden=True)
|
|
return
|
|
assignee = mr.assignee
|
|
if assignee:
|
|
assignee = assignee["name"]
|
|
else:
|
|
assignee = "None"
|
|
|
|
created_at = datetime.strptime(mr.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
|
|
labels = mr.labels
|
|
if labels:
|
|
labels = "\n".join(mr.labels)
|
|
if not labels:
|
|
labels = "None"
|
|
|
|
fields = [
|
|
Field(name="State", value=mr.state[0].upper() + mr.state[1:]),
|
|
Field(name="Assignee", value=assignee),
|
|
Field(name="Labels", value=labels),
|
|
]
|
|
if mr.labels:
|
|
color = self.project.labels.get(mr.labels[0]).color
|
|
else:
|
|
color = "#00FFEE"
|
|
fields.append(Field(name="Created At", value=created_at))
|
|
if mr.state == "merged":
|
|
merged_at = datetime.strptime(mr.merged_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
fields.append(Field(name="Merged At", value=merged_at))
|
|
elif mr.state == "closed":
|
|
closed_at = datetime.strptime(mr.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC")
|
|
fields.append(Field(name="Closed At", value=closed_at))
|
|
if mr.milestone:
|
|
fields.append(
|
|
Field(
|
|
name="Milestone",
|
|
value=f"[{mr.milestone['title']}]({mr.milestone['web_url']})",
|
|
inline=False,
|
|
)
|
|
)
|
|
if len(mr.title) > 200:
|
|
mr.title = mr.title[:200] + "..."
|
|
embed = build_embed(
|
|
title=f"[#{mr.iid}] {mr.title}",
|
|
description=mr.description,
|
|
fields=fields,
|
|
color=color,
|
|
url=mr.web_url,
|
|
)
|
|
embed.set_author(
|
|
name=mr.author["name"],
|
|
icon_url=mr.author["avatar_url"],
|
|
url=mr.author["web_url"],
|
|
)
|
|
embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png")
|
|
await ctx.send(embed=embed)
|
|
|
|
def build_embed_page(self, api_list: list, t_state: str, name: str) -> Embed:
|
|
"""Build an embed page for the paginator."""
|
|
title = ""
|
|
if t_state:
|
|
title = f"{t_state} "
|
|
title += f"J.A.R.V.I.S. {name}s"
|
|
fields = []
|
|
for item in api_list:
|
|
fields.append(
|
|
Field(
|
|
name=f"[#{item.iid}] {item.title}",
|
|
value=item.description + f"\n\n[View this {name}]({item.web_url})",
|
|
inline=False,
|
|
)
|
|
)
|
|
|
|
embed = build_embed(
|
|
title=title,
|
|
description="",
|
|
fields=fields,
|
|
url=f"https://git.zevaryx.com/stark-industries/j.a.r.v.i.s./{name.replace(' ', '_')}s",
|
|
)
|
|
embed.set_author(
|
|
name="J.A.R.V.I.S.",
|
|
url="https://git.zevaryx.com/jarvis",
|
|
icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png",
|
|
)
|
|
embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png")
|
|
return embed
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="gl",
|
|
name="issues",
|
|
description="Get open issues from GitLab",
|
|
guild_ids=guild_ids,
|
|
options=[
|
|
create_option(
|
|
name="state",
|
|
description="State of issues to get",
|
|
option_type=3,
|
|
required=False,
|
|
choices=[
|
|
create_choice(name="Open", value="opened"),
|
|
create_choice(name="Closed", value="closed"),
|
|
create_choice(name="All", value="all"),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
async def _issues(self, ctx: SlashContext, state: str = "opened") -> None:
|
|
exists = self.check_cache(ctx, state=state)
|
|
if exists:
|
|
await ctx.defer(hidden=True)
|
|
await ctx.send(
|
|
"Please use existing interaction: " + f"{exists['paginator']._message.jump_url}",
|
|
hidden=True,
|
|
)
|
|
return
|
|
await ctx.defer()
|
|
m_state = state
|
|
if m_state == "all":
|
|
m_state = None
|
|
issues = []
|
|
page = 1
|
|
try:
|
|
while curr_page := self.project.issues.list(
|
|
page=page,
|
|
state=m_state,
|
|
order_by="created_at",
|
|
sort="desc",
|
|
per_page=100,
|
|
):
|
|
issues += curr_page
|
|
page += 1
|
|
except gitlab.exceptions.GitlabGetError:
|
|
# Only send error on first page. Otherwise, use pages retrieved
|
|
if page == 1:
|
|
await ctx.send("Unable to get issues")
|
|
return
|
|
|
|
if len(issues) == 0:
|
|
await ctx.send("No issues match that criteria")
|
|
return
|
|
|
|
t_state = state
|
|
if t_state == "opened":
|
|
t_state = "open"
|
|
pages = []
|
|
t_state = t_state[0].upper() + t_state[1:]
|
|
for i in range(0, len(issues), 5):
|
|
pages.append(self.build_embed_page(issues[i : i + 5], t_state=t_state, name="issue")) # noqa: E203
|
|
|
|
paginator = Paginator(
|
|
bot=self.bot,
|
|
ctx=ctx,
|
|
embeds=pages,
|
|
only=ctx.author,
|
|
timeout=60 * 5, # 5 minute timeout
|
|
disable_after_timeout=True,
|
|
use_extend=len(pages) > 2,
|
|
left_button_style=ButtonStyle.grey,
|
|
right_button_style=ButtonStyle.grey,
|
|
basic_buttons=["◀", "▶"],
|
|
)
|
|
|
|
self.cache[hash(paginator)] = {
|
|
"user": ctx.author.id,
|
|
"guild": ctx.guild.id,
|
|
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
|
"command": ctx.subcommand_name,
|
|
"state": state,
|
|
"paginator": paginator,
|
|
}
|
|
|
|
await paginator.start()
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="gl",
|
|
name="mergerequests",
|
|
description="Get open issues from GitLab",
|
|
guild_ids=guild_ids,
|
|
options=[
|
|
create_option(
|
|
name="state",
|
|
description="State of issues to get",
|
|
option_type=3,
|
|
required=False,
|
|
choices=[
|
|
create_choice(name="Open", value="opened"),
|
|
create_choice(name="Closed", value="closed"),
|
|
create_choice(name="Merged", value="merged"),
|
|
create_choice(name="All", value="all"),
|
|
],
|
|
)
|
|
],
|
|
)
|
|
async def _mergerequests(self, ctx: SlashContext, state: str = "opened") -> None:
|
|
exists = self.check_cache(ctx, state=state)
|
|
if exists:
|
|
await ctx.defer(hidden=True)
|
|
await ctx.send(
|
|
"Please use existing interaction: " + f"{exists['paginator']._message.jump_url}",
|
|
hidden=True,
|
|
)
|
|
return
|
|
await ctx.defer()
|
|
m_state = state
|
|
if m_state == "all":
|
|
m_state = None
|
|
merges = []
|
|
page = 1
|
|
try:
|
|
while curr_page := self.project.mergerequests.list(
|
|
page=page,
|
|
state=m_state,
|
|
order_by="created_at",
|
|
sort="desc",
|
|
per_page=100,
|
|
):
|
|
merges += curr_page
|
|
page += 1
|
|
except gitlab.exceptions.GitlabGetError:
|
|
# Only send error on first page. Otherwise, use pages retrieved
|
|
if page == 1:
|
|
await ctx.send("Unable to get merge requests")
|
|
return
|
|
|
|
if len(merges) == 0:
|
|
await ctx.send("No merge requests match that criteria")
|
|
return
|
|
|
|
t_state = state
|
|
if t_state == "opened":
|
|
t_state = "open"
|
|
pages = []
|
|
t_state = t_state[0].upper() + t_state[1:]
|
|
for i in range(0, len(merges), 5):
|
|
pages.append(self.build_embed_page(merges[i : i + 5], t_state=t_state, name="merge request")) # noqa: E203
|
|
|
|
paginator = Paginator(
|
|
bot=self.bot,
|
|
ctx=ctx,
|
|
embeds=pages,
|
|
only=ctx.author,
|
|
timeout=60 * 5, # 5 minute timeout
|
|
disable_after_timeout=True,
|
|
use_extend=len(pages) > 2,
|
|
left_button_style=ButtonStyle.grey,
|
|
right_button_style=ButtonStyle.grey,
|
|
basic_buttons=["◀", "▶"],
|
|
)
|
|
|
|
self.cache[hash(paginator)] = {
|
|
"user": ctx.author.id,
|
|
"guild": ctx.guild.id,
|
|
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
|
"command": ctx.subcommand_name,
|
|
"state": state,
|
|
"paginator": paginator,
|
|
}
|
|
|
|
await paginator.start()
|
|
|
|
@cog_ext.cog_subcommand(
|
|
base="gl",
|
|
name="milestones",
|
|
description="Get open issues from GitLab",
|
|
guild_ids=guild_ids,
|
|
)
|
|
async def _milestones(self, ctx: SlashContext) -> None:
|
|
exists = self.check_cache(ctx)
|
|
if exists:
|
|
await ctx.defer(hidden=True)
|
|
await ctx.send(
|
|
f"Please use existing interaction: {exists['paginator']._message.jump_url}",
|
|
hidden=True,
|
|
)
|
|
return
|
|
await ctx.defer()
|
|
milestones = []
|
|
page = 1
|
|
try:
|
|
while curr_page := self.project.milestones.list(
|
|
page=page,
|
|
order_by="created_at",
|
|
sort="desc",
|
|
per_page=100,
|
|
):
|
|
milestones += curr_page
|
|
page += 1
|
|
except gitlab.exceptions.GitlabGetError:
|
|
# Only send error on first page. Otherwise, use pages retrieved
|
|
if page == 1:
|
|
await ctx.send("Unable to get milestones")
|
|
return
|
|
|
|
if len(milestones) == 0:
|
|
await ctx.send("No milestones exist")
|
|
return
|
|
|
|
pages = []
|
|
for i in range(0, len(milestones), 5):
|
|
pages.append(self.build_embed_page(milestones[i : i + 5], t_state=None, name="milestone")) # noqa: E203
|
|
|
|
paginator = Paginator(
|
|
bot=self.bot,
|
|
ctx=ctx,
|
|
embeds=pages,
|
|
only=ctx.author,
|
|
timeout=60 * 5, # 5 minute timeout
|
|
disable_after_timeout=True,
|
|
use_extend=len(pages) > 2,
|
|
left_button_style=ButtonStyle.grey,
|
|
right_button_style=ButtonStyle.grey,
|
|
basic_buttons=["◀", "▶"],
|
|
)
|
|
|
|
self.cache[hash(paginator)] = {
|
|
"user": ctx.author.id,
|
|
"guild": ctx.guild.id,
|
|
"timeout": datetime.utcnow() + timedelta(minutes=5),
|
|
"command": ctx.subcommand_name,
|
|
"paginator": paginator,
|
|
}
|
|
|
|
await paginator.start()
|
|
|
|
|
|
def setup(bot: commands.Bot) -> None:
|
|
"""Add GitlabCog to J.A.R.V.I.S. if Gitlab token exists."""
|
|
if get_config().gitlab_token:
|
|
bot.add_cog(GitlabCog(bot))
|