Merge branch 'gitlabcog'

This commit is contained in:
Zeva Rose 2021-07-23 14:02:21 -06:00
commit b2753cdd04
5 changed files with 569 additions and 3 deletions

View file

@ -7,7 +7,8 @@
password: pass
host: localhost
port: 27017
api_urls:
urls:
url_name: url
url_name2: url2
max_messages: 1000
gitlab_token: null

View file

@ -36,7 +36,7 @@ jarvis = commands.Bot(
)
slash = SlashCommand(jarvis, sync_commands=True, sync_on_cog_reload=True)
jarvis_self = Process()
__version__ = "1.2.5"
__version__ = "1.3.0"
db = DBManager(get_config().mongo).mongo

View file

@ -34,7 +34,8 @@ class ErrorHandlerCog(commands.Cog):
)
else:
await ctx.send(
f"Error processing command:\n```{error}```", hidden=True
f"Error processing command:\n```{error}```",
hidden=True,
)

562
jarvis/cogs/gitlab.py Normal file
View file

@ -0,0 +1,562 @@
from datetime import datetime, timedelta
import gitlab
from ButtonPaginator import Paginator
from discord.ext import commands
from discord.ext.tasks import loop
from discord.utils import find
from discord_slash import ComponentContext, SlashContext, cog_ext
from discord_slash.model import ButtonStyle
from discord_slash.utils import manage_components
from discord_slash.utils.manage_commands import create_choice, create_option
from jarvis.config import get_config
from jarvis.utils import build_embed
from jarvis.utils.field import Field
guild_ids = [862402786116763668]
class GitlabCog(commands.Cog):
def __init__(self, bot):
self.bot = bot
config = get_config()
self._gitlab = gitlab.Gitlab(
"https://git.zevaryx.com", private_token=config.gitlab_token
)
# J.A.R.V.I.S. GitLab ID is 29
self.project = self._gitlab.projects.get(29)
self.cache = {}
self._expire_interaction.start()
@cog_ext.cog_subcommand(
base="gl",
name="issue",
description="Get an issue from GitLab",
guild_ids=guild_ids,
options=[
create_option(
name="id", description="Issue ID", option_type=4, required=True
)
],
)
async def _issue(self, ctx: SlashContext, id: int):
try:
issue = self.project.issues.get(int(id))
except gitlab.exceptions.GitlabGetError:
await ctx.send("Issue does not exist.", hidden=True)
return
assignee = issue.assignee
if assignee:
assignee = assignee["name"]
else:
assignee = "None"
created_at = datetime.strptime(
issue.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
labels = issue.labels
if labels:
labels = "\n".join(issue.labels)
if not labels:
labels = "None"
fields = [
Field(
name="State", value=issue.state[0].upper() + issue.state[1:]
),
Field(name="Assignee", value=assignee),
Field(name="Labels", value=labels),
]
color = self.project.labels.get(issue.labels[0]).color
fields.append(Field(name="Created At", value=created_at))
if issue.state == "closed":
closed_at = datetime.strptime(
issue.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append(Field(name="Closed At", value=closed_at))
if issue.milestone:
fields.append(
Field(
name="Milestone",
value=f"[{issue.milestone['title']}]"
+ f"({issue.milestone['web_url']})",
inline=False,
)
)
if len(issue.title) > 200:
issue.title = issue.title[:200] + "..."
embed = build_embed(
title=f"[#{issue.iid}] {issue.title}",
description=issue.description,
fields=fields,
color=color,
url=issue.web_url,
)
embed.set_author(
name=issue.author["name"],
icon_url=issue.author["avatar_url"],
url=issue.author["web_url"],
)
embed.set_thumbnail(
url="https://about.gitlab.com/images"
+ "/press/logo/png/gitlab-icon-rgb.png"
)
await ctx.send(embed=embed)
@cog_ext.cog_subcommand(
base="gl",
name="milestone",
description="Get a milestone from GitLab",
guild_ids=guild_ids,
options=[
create_option(
name="id",
description="Milestone ID",
option_type=4,
required=True,
)
],
)
async def _milestone(self, ctx: SlashContext, id: int):
try:
milestone = self.project.milestones.get(int(id))
except gitlab.exceptions.GitlabGetError:
await ctx.send("Milestone does not exist.", hidden=True)
return
created_at = datetime.strptime(
milestone.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
fields = [
Field(
name="State",
value=milestone.state[0].upper() + milestone.state[1:],
),
Field(name="Start Date", value=milestone.start_date),
Field(name="Due Date", value=milestone.due_date),
Field(name="Created At", value=created_at),
]
if milestone.updated_at:
updated_at = datetime.strptime(
milestone.updated_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append(Field(name="Updated At", value=updated_at))
if len(milestone.title) > 200:
milestone.title = milestone.title[:200] + "..."
embed = build_embed(
title=f"[#{milestone.iid}] {milestone.title}",
description=milestone.description,
fields=fields,
color="#00FFEE",
url=milestone.web_url,
)
embed.set_author(
name="J.A.R.V.I.S.",
url="https://git.zevaryx.com/jarvis",
icon_url="https://git.zevaryx.com/uploads/-"
+ "/system/user/avatar/11/avatar.png",
)
embed.set_thumbnail(
url="https://about.gitlab.com/images"
+ "/press/logo/png/gitlab-icon-rgb.png"
)
await ctx.send(embed=embed)
@cog_ext.cog_subcommand(
base="gl",
name="mergerequest",
description="Get an merge request from GitLab",
guild_ids=guild_ids,
options=[
create_option(
name="id",
description="Merge Request ID",
option_type=4,
required=True,
)
],
)
async def _mergerequest(self, ctx: SlashContext, id: int):
try:
mr = self.project.mergerequests.get(int(id))
except gitlab.exceptions.GitlabGetError:
await ctx.send("Merge request does not exist.", hidden=True)
return
assignee = mr.assignee
if assignee:
assignee = assignee["name"]
else:
assignee = "None"
created_at = datetime.strptime(
mr.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
labels = mr.labels
if labels:
labels = "\n".join(mr.labels)
if not labels:
labels = "None"
fields = [
Field(name="State", value=mr.state[0].upper() + mr.state[1:]),
Field(name="Assignee", value=assignee),
Field(name="Labels", value=labels),
]
if mr.labels:
color = self.project.labels.get(mr.labels[0]).color
else:
color = "#00FFEE"
fields.append(Field(name="Created At", value=created_at))
if mr.state == "merged":
merged_at = datetime.strptime(
mr.merged_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append(Field(name="Merged At", value=merged_at))
elif mr.state == "closed":
closed_at = datetime.strptime(
mr.closed_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).strftime("%Y-%m-%d %H:%M:%S UTC")
fields.append(Field(name="Closed At", value=closed_at))
if mr.milestone:
fields.append(
Field(
name="Milestone",
value=f"[{mr.milestone['title']}]"
+ f"({mr.milestone['web_url']})",
inline=False,
)
)
if len(mr.title) > 200:
mr.title = mr.title[:200] + "..."
embed = build_embed(
title=f"[#{mr.iid}] {mr.title}",
description=mr.description,
fields=fields,
color=color,
url=mr.web_url,
)
embed.set_author(
name=mr.author["name"],
icon_url=mr.author["avatar_url"],
url=mr.author["web_url"],
)
embed.set_thumbnail(
url="https://about.gitlab.com/images"
+ "/press/logo/png/gitlab-icon-rgb.png"
)
await ctx.send(embed=embed)
def build_embed_page(self, api_list: list, t_state: str, name: str):
title = ""
if t_state:
title = f"{t_state} "
title += f"J.A.R.V.I.S. {name}s"
fields = []
for item in api_list:
fields.append(
Field(
name=f"[#{item.iid}] {item.title}",
value=item.description
+ f"\n\n[View this {name}]({item.web_url})",
inline=False,
)
)
embed = build_embed(
title=title,
description="",
fields=fields,
url="https://git.zevaryx.com/stark-industries/j.a.r.v.i.s./"
+ f"{name.replace(' ', '_')}s",
)
embed.set_author(
name="J.A.R.V.I.S.",
url="https://git.zevaryx.com/jarvis",
icon_url="https://git.zevaryx.com/uploads/-"
+ "/system/user/avatar/11/avatar.png",
)
embed.set_thumbnail(
url="https://about.gitlab.com/images"
+ "/press/logo/png/gitlab-icon-rgb.png"
)
return embed
def check_cache(self, ctx: SlashContext, **kwargs):
if not kwargs:
kwargs = {}
return find(
lambda x: x["command"] == ctx.subcommand_name
and x["user"] == ctx.author.id
and all(x[k] == v for k, v in kwargs.items()),
self.cache.values(),
)
@cog_ext.cog_subcommand(
base="gl",
name="issues",
description="Get open issues from GitLab",
guild_ids=guild_ids,
options=[
create_option(
name="state",
description="State of issues to get",
option_type=3,
required=False,
choices=[
create_choice(name="Open", value="opened"),
create_choice(name="Closed", value="closed"),
create_choice(name="All", value="all"),
],
)
],
)
async def _issues(self, ctx: SlashContext, state: str = "opened"):
exists = self.check_cache(ctx, state=state)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
await ctx.defer()
m_state = state
if m_state == "all":
m_state = None
issues = []
page = 1
try:
while curr_page := self.project.issues.list(
page=page,
state=m_state,
order_by="created_at",
sort="desc",
per_page=100,
):
issues += curr_page
page += 1
except gitlab.exceptions.GitlabGetError:
# Only send error on first page. Otherwise, use pages retrieved
if page == 1:
await ctx.send("Unable to get issues")
return
if len(issues) == 0:
await ctx.send("No issues match that criteria")
return
t_state = state
if t_state == "opened":
t_state = "open"
pages = []
t_state = t_state[0].upper() + t_state[1:]
for i in range(0, len(issues), 5):
pages.append(
self.build_embed_page(
issues[i : i + 5], t_state=t_state, name="issue"
)
)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"state": state,
"paginator": paginator,
}
await paginator.start()
@cog_ext.cog_subcommand(
base="gl",
name="mergerequests",
description="Get open issues from GitLab",
guild_ids=guild_ids,
options=[
create_option(
name="state",
description="State of issues to get",
option_type=3,
required=False,
choices=[
create_choice(name="Open", value="opened"),
create_choice(name="Closed", value="closed"),
create_choice(name="Merged", value="merged"),
create_choice(name="All", value="all"),
],
)
],
)
async def _mergerequests(self, ctx: SlashContext, state: str = "opened"):
exists = self.check_cache(ctx, state=state)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
await ctx.defer()
m_state = state
if m_state == "all":
m_state = None
merges = []
page = 1
try:
while curr_page := self.project.mergerequests.list(
page=page,
state=m_state,
order_by="created_at",
sort="desc",
per_page=100,
):
merges += curr_page
page += 1
except gitlab.exceptions.GitlabGetError:
# Only send error on first page. Otherwise, use pages retrieved
if page == 1:
await ctx.send("Unable to get merge requests")
return
if len(merges) == 0:
await ctx.send("No merge requests match that criteria")
return
t_state = state
if t_state == "opened":
t_state = "open"
pages = []
t_state = t_state[0].upper() + t_state[1:]
for i in range(0, len(merges), 5):
pages.append(
self.build_embed_page(
merges[i : i + 5], t_state=t_state, name="merge request"
)
)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"state": state,
"paginator": paginator,
}
await paginator.start()
@cog_ext.cog_subcommand(
base="gl",
name="milestones",
description="Get open issues from GitLab",
guild_ids=guild_ids,
)
async def _milestones(self, ctx: SlashContext):
exists = self.check_cache(ctx)
if exists:
await ctx.defer(hidden=True)
await ctx.send(
"Please use existing interaction: "
+ f"{exists['paginator']._message.jump_url}",
hidden=True,
)
return
await ctx.defer()
milestones = []
page = 1
try:
while curr_page := self.project.milestones.list(
page=page,
order_by="created_at",
sort="desc",
per_page=100,
):
milestones += curr_page
page += 1
except gitlab.exceptions.GitlabGetError:
# Only send error on first page. Otherwise, use pages retrieved
if page == 1:
await ctx.send("Unable to get milestones")
return
if len(milestones) == 0:
await ctx.send("No milestones exist")
return
pages = []
for i in range(0, len(milestones), 5):
pages.append(
self.build_embed_page(
milestones[i : i + 5], t_state=None, name="milestone"
)
)
paginator = Paginator(
bot=self.bot,
ctx=ctx,
embeds=pages,
only=ctx.author,
timeout=60 * 5, # 5 minute timeout
disable_after_timeout=True,
use_extend=len(pages) > 2,
left_button_style=ButtonStyle.grey,
right_button_style=ButtonStyle.grey,
basic_buttons=["", ""],
)
self.cache[hash(paginator)] = {
"user": ctx.author.id,
"timeout": datetime.utcnow() + timedelta(minutes=5),
"command": ctx.subcommand_name,
"paginator": paginator,
}
await paginator.start()
@loop(minutes=1)
async def _expire_interaction(self):
keys = list(self.cache.keys())
for key in keys:
if self.cache[key]["timeout"] <= datetime.utcnow() + timedelta(
minutes=1
):
del self.cache[key]
def setup(bot):
if get_config().gitlab_token:
bot.add_cog(GitlabCog(bot))

View file

@ -24,6 +24,7 @@ class Config(object):
logo: str,
mongo: dict,
urls: dict,
gitlab_token: str = None,
max_messages: int = 1000,
):
self.token = token
@ -32,6 +33,7 @@ class Config(object):
self.mongo = mongo
self.urls = urls
self.max_messages = max_messages
self.gitlab_token = gitlab_token
db = DBManager(config=mongo).mongo.jarvis.config
db_config = db.find()
for item in db_config: