"""JARVIS GitLab Cog.""" import asyncio import logging from datetime import datetime import gitlab from interactions import Client, Extension, InteractionContext from interactions.ext.paginators import Paginator from interactions.models.discord.embed import Embed, EmbedField from interactions.models.discord.modal import InputText, Modal, TextStyles from interactions.models.discord.user import Member from interactions.models.internal.application_commands import ( OptionType, SlashCommand, SlashCommandChoice, slash_command, slash_option, ) from jarvis.config import load_config from jarvis.utils import build_embed guild_ids = [862402786116763668] class GitlabCog(Extension): """JARVIS GitLab Cog.""" def __init__(self, bot: Client): self.bot = bot self.logger = logging.getLogger(__name__) config = load_config() self._gitlab = gitlab.Gitlab("https://git.zevaryx.com", private_token=config.gitlab_token) # JARVIS GitLab ID is 29 self.project = self._gitlab.projects.get(29) gl = SlashCommand(name="gl", description="Get GitLab info", scopes=guild_ids) @gl.subcommand( sub_cmd_name="issue", sub_cmd_description="Get an issue from GitLab", ) @slash_option(name="id", description="Issue ID", opt_type=OptionType.INTEGER, required=True) async def _issue(self, ctx: InteractionContext, id: int) -> None: try: issue = self.project.issues.get(int(id)) except gitlab.exceptions.GitlabGetError: await ctx.send("Issue does not exist.", ephemeral=True) return assignee = issue.assignee if assignee: assignee = assignee["name"] else: assignee = "None" created_at = datetime.strptime(issue.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC") labels = issue.labels if labels: labels = "\n".join(issue.labels) else: labels = "None" fields = [ EmbedField(name="State", value=issue.state.title()), EmbedField(name="Assignee", value=assignee), EmbedField(name="Labels", value=labels), ] color = "#FC6D27" if issue.labels: color = self.project.labels.get(issue.labels[0]).color fields.append(EmbedField(name="Created At", value=created_at)) if issue.state == "closed": closed_at = datetime.strptime(issue.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC") fields.append(EmbedField(name="Closed At", value=closed_at)) if issue.milestone: fields.append( EmbedField( name="Milestone", value=f"[{issue.milestone['title']}]({issue.milestone['web_url']})", inline=False, ) ) if len(issue.title) > 200: issue.title = issue.title[:200] + "..." embed = build_embed( title=f"[#{issue.iid}] {issue.title}", description=issue.description, fields=fields, color=color, url=issue.web_url, ) embed.set_author( name=issue.author["name"], icon_url=issue.author["avatar_url"], url=issue.author["web_url"], ) embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png") await ctx.send(embeds=embed) @gl.subcommand( sub_cmd_name="milestone", sub_cmd_description="Get a milestone from GitLab", ) @slash_option(name="id", description="Milestone ID", opt_type=OptionType.INTEGER, required=True) async def _milestone(self, ctx: InteractionContext, id: int) -> None: try: milestone = self.project.milestones.get(int(id)) except gitlab.exceptions.GitlabGetError: await ctx.send("Milestone does not exist.", ephemeral=True) return created_at = datetime.strptime(milestone.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC") fields = [ EmbedField( name="State", value=milestone.state[0].upper() + milestone.state[1:], ), EmbedField(name="Start Date", value=milestone.start_date), EmbedField(name="Due Date", value=milestone.due_date), EmbedField(name="Created At", value=created_at), ] if milestone.updated_at: updated_at = datetime.strptime(milestone.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime( "%Y-%m-%d %H:%M:%S UTC" ) fields.append(EmbedField(name="Updated At", value=updated_at)) if len(milestone.title) > 200: milestone.title = milestone.title[:200] + "..." embed = build_embed( title=f"[#{milestone.iid}] {milestone.title}", description=milestone.description, fields=fields, color="#00FFEE", url=milestone.web_url, ) embed.set_author( name="JARVIS", url="https://git.zevaryx.com/jarvis", icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png", ) embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png") await ctx.send(embeds=embed) @gl.subcommand( sub_cmd_name="mr", sub_cmd_description="Get a merge request from GitLab", ) @slash_option(name="id", description="Merge Request ID", opt_type=OptionType.INTEGER, required=True) async def _mergerequest(self, ctx: InteractionContext, id: int) -> None: try: mr = self.project.mergerequests.get(int(id)) except gitlab.exceptions.GitlabGetError: await ctx.send("Merge request does not exist.", ephemeral=True) return assignee = mr.assignee if assignee: assignee = assignee["name"] else: assignee = "None" created_at = datetime.strptime(mr.created_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC") labels = mr.labels if labels: labels = "\n".join(mr.labels) if not labels: labels = "None" fields = [ EmbedField(name="State", value=mr.state[0].upper() + mr.state[1:], inline=True), EmbedField(name="Draft?", value=str(mr.draft), inline=True), EmbedField(name="Assignee", value=assignee, inline=True), EmbedField(name="Labels", value=labels, inline=True), ] if mr.labels: color = self.project.labels.get(mr.labels[0]).color else: color = "#00FFEE" fields.append(EmbedField(name="Created At", value=created_at, inline=True)) if mr.state == "merged": merged_at = datetime.strptime(mr.merged_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC") fields.append(EmbedField(name="Merged At", value=merged_at, inline=True)) elif mr.state == "closed": closed_at = datetime.strptime(mr.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ").strftime("%Y-%m-%d %H:%M:%S UTC") fields.append(EmbedField(name="Closed At", value=closed_at, inline=True)) if mr.milestone: fields.append( EmbedField( name="Milestone", value=f"[{mr.milestone['title']}]({mr.milestone['web_url']})", inline=False, ) ) if len(mr.title) > 200: mr.title = mr.title[:200] + "..." embed = build_embed( title=f"[#{mr.iid}] {mr.title}", description=mr.description, fields=fields, color=color, url=mr.web_url, ) embed.set_author( name=mr.author["name"], icon_url=mr.author["avatar_url"], url=mr.author["web_url"], ) embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png") await ctx.send(embeds=embed) def build_embed_page(self, api_list: list, t_state: str, name: str) -> Embed: """Build an embed page for the paginator.""" title = "" if t_state: title = f"{t_state} " title += f"JARVIS {name}s" fields = [] for item in api_list: description = item.description or "No description" fields.append( EmbedField( name=f"[#{item.iid}] {item.title}", value=(description[:200] + f"...\n\n[View this {name}]({item.web_url})"), inline=False, ) ) embed = build_embed( title=title, description="", fields=fields, url=f"https://git.zevaryx.com/stark-industries/JARVIS/{name.replace(' ', '_')}s", ) embed.set_author( name="JARVIS", url="https://git.zevaryx.com/jarvis", icon_url="https://git.zevaryx.com/uploads/-/system/user/avatar/11/avatar.png", ) embed.set_thumbnail(url="https://about.gitlab.com/images/press/logo/png/gitlab-icon-rgb.png") return embed @gl.subcommand( sub_cmd_name="issues", sub_cmd_description="Get issues from GitLab", ) @slash_option( name="state", description="State of issues to get", opt_type=OptionType.STRING, required=False, choices=[ SlashCommandChoice(name="Open", value="opened"), SlashCommandChoice(name="Closed", value="closed"), SlashCommandChoice(name="All", value="all"), ], ) async def _issues(self, ctx: InteractionContext, state: str = "opened") -> None: await ctx.defer() m_state = state if m_state == "all": m_state = None issues = [] page = 1 try: while curr_page := self.project.issues.list( page=page, state=m_state, order_by="created_at", sort="desc", per_page=100, ): issues += curr_page page += 1 except gitlab.exceptions.GitlabGetError: # Only send error on first page. Otherwise, use pages retrieved if page == 1: await ctx.send("Unable to get issues") return if len(issues) == 0: await ctx.send("No issues match that criteria") return t_state = state if t_state == "opened": t_state = "open" pages = [] t_state = t_state[0].upper() + t_state[1:] for i in range(0, len(issues), 5): pages.append(self.build_embed_page(issues[i : i + 5], t_state=t_state, name="issue")) paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300) await paginator.send(ctx) @gl.subcommand( sub_cmd_name="mrs", sub_cmd_description="Get merge requests from GitLab", ) @slash_option( name="state", description="State of merge requests to get", opt_type=OptionType.STRING, required=False, choices=[ SlashCommandChoice(name="Open", value="opened"), SlashCommandChoice(name="Closed", value="closed"), SlashCommandChoice(name="All", value="all"), ], ) async def _mergerequests(self, ctx: InteractionContext, state: str = "opened") -> None: await ctx.defer() m_state = state if m_state == "all": m_state = None merges = [] page = 1 try: while curr_page := self.project.mergerequests.list( page=page, state=m_state, order_by="created_at", sort="desc", per_page=100, ): merges += curr_page page += 1 except gitlab.exceptions.GitlabGetError: # Only send error on first page. Otherwise, use pages retrieved if page == 1: await ctx.send("Unable to get merge requests") return if len(merges) == 0: await ctx.send("No merge requests match that criteria") return t_state = state if t_state == "opened": t_state = "open" pages = [] t_state = t_state[0].upper() + t_state[1:] for i in range(0, len(merges), 5): pages.append(self.build_embed_page(merges[i : i + 5], t_state=t_state, name="merge request")) paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300) await paginator.send(ctx) @gl.subcommand( sub_cmd_name="milestones", sub_cmd_description="Get milestones from GitLab", ) async def _milestones(self, ctx: InteractionContext) -> None: await ctx.defer() milestones = [] page = 1 try: while curr_page := self.project.milestones.list( page=page, order_by="created_at", sort="desc", per_page=100, ): milestones += curr_page page += 1 except gitlab.exceptions.GitlabGetError: # Only send error on first page. Otherwise, use pages retrieved if page == 1: await ctx.send("Unable to get milestones") return if len(milestones) == 0: await ctx.send("No milestones exist") return pages = [] for i in range(0, len(milestones), 5): pages.append(self.build_embed_page(milestones[i : i + 5], t_state=None, name="milestone")) paginator = Paginator.create_from_embeds(self.bot, *pages, timeout=300) await paginator.send(ctx) @slash_command(name="issue", description="Report an issue on GitLab", scopes=guild_ids) @slash_option( name="user", description="Credit someone else for this issue", opt_type=OptionType.USER, required=False, ) async def _open_issue(self, ctx: InteractionContext, user: Member = None) -> None: user = user or ctx.author modal = Modal( title="Open a new issue on GitLab", components=[ InputText( label="Issue Title", placeholder="Descriptive Title", style=TextStyles.SHORT, custom_id="title", max_length=200, ), InputText( label="Description (supports Markdown!)", placeholder="Detailed Description", style=TextStyles.PARAGRAPH, custom_id="description", ), ], ) await ctx.send_modal(modal) try: resp = await self.bot.wait_for_modal(modal, author=ctx.author.id, timeout=60 * 5) title = resp.responses.get("title") desc = resp.responses.get("description") except asyncio.TimeoutError: return if not title.startswith("[Discord]"): title = "[Discord] " + title desc = f"Opened by `@{user.username}` on Discord\n\n" + desc issue = self.project.issues.create(data={"title": title, "description": desc}) embed = build_embed( title=f"Issue #{issue.id} Created", description=("Thank you for opening an issue!\n\n[View it online]({issue['web_url']})"), fields=[], color="#00FFEE", ) await resp.send(embeds=embed) def setup(bot: Client) -> None: """Add GitlabCog to JARVIS if Gitlab token exists.""" if load_config().gitlab_token: GitlabCog(bot) else: bot.logger.info("Missing GitLab token, ignoring GitLab cog")