180 lines
5.6 KiB
Python
Executable File
180 lines
5.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
#
|
|
# Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
|
|
# Automatically figures out the last run and changelog contents with the GitHub API.
|
|
#
|
|
|
|
import io
|
|
import itertools
|
|
import os
|
|
import requests
|
|
import yaml
|
|
from typing import Any, Iterable
|
|
|
|
GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
|
|
GITHUB_REPOSITORY = os.environ["GITHUB_REPOSITORY"]
|
|
GITHUB_RUN = os.environ["GITHUB_RUN_ID"]
|
|
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
|
|
|
|
# https://discord.com/developers/docs/resources/webhook
|
|
DISCORD_SPLIT_LIMIT = 2000
|
|
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL")
|
|
|
|
CHANGELOG_FILE = "Resources/Changelog/Changelog.yml"
|
|
|
|
TYPES_TO_EMOJI = {
|
|
"Fix": "🐛",
|
|
"Add": "🆕",
|
|
"Remove": "❌",
|
|
"Tweak": "⚒️"
|
|
}
|
|
|
|
ChangelogEntry = dict[str, Any]
|
|
|
|
def main():
|
|
if not DISCORD_WEBHOOK_URL:
|
|
return
|
|
|
|
session = requests.Session()
|
|
session.headers["Authorization"] = f"Bearer {GITHUB_TOKEN}"
|
|
session.headers["Accept"] = "Accept: application/vnd.github+json"
|
|
session.headers["X-GitHub-Api-Version"] = "2022-11-28"
|
|
|
|
most_recent = get_most_recent_workflow(session)
|
|
last_sha = most_recent['head_commit']['id']
|
|
print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
|
|
last_changelog = yaml.safe_load(get_last_changelog(session, last_sha))
|
|
with open(CHANGELOG_FILE, "r") as f:
|
|
cur_changelog = yaml.safe_load(f)
|
|
|
|
diff = diff_changelog(last_changelog, cur_changelog)
|
|
send_to_discord(diff)
|
|
|
|
|
|
def get_most_recent_workflow(sess: requests.Session) -> Any:
|
|
workflow_run = get_current_run(sess)
|
|
past_runs = get_past_runs(sess, workflow_run)
|
|
for run in past_runs['workflow_runs']:
|
|
# First past successful run that isn't our current run.
|
|
if run["id"] == workflow_run["id"]:
|
|
continue
|
|
|
|
return run
|
|
|
|
|
|
def get_current_run(sess: requests.Session) -> Any:
|
|
resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/actions/runs/{GITHUB_RUN}")
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def get_past_runs(sess: requests.Session, current_run: Any) -> Any:
|
|
"""
|
|
Get all successful workflow runs before our current one.
|
|
"""
|
|
params = {
|
|
"status": "success",
|
|
"created": f"<={current_run['created_at']}"
|
|
}
|
|
resp = sess.get(f"{current_run['workflow_url']}/runs", params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def get_last_changelog(sess: requests.Session, sha: str) -> str:
|
|
"""
|
|
Use GitHub API to get the previous version of the changelog YAML (Actions builds are fetched with a shallow clone)
|
|
"""
|
|
params = {
|
|
"ref": sha,
|
|
}
|
|
headers = {
|
|
"Accept": "application/vnd.github.raw"
|
|
}
|
|
|
|
resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/contents/{CHANGELOG_FILE}", headers=headers, params=params)
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
|
|
|
|
def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[ChangelogEntry]:
|
|
"""
|
|
Find all new entries not present in the previous publish.
|
|
"""
|
|
old_entry_ids = {e["id"] for e in old["Entries"]}
|
|
return (e for e in cur["Entries"] if e["id"] not in old_entry_ids)
|
|
|
|
|
|
def get_discord_body(content: str):
|
|
return {
|
|
"content": content,
|
|
# Do not allow any mentions.
|
|
"allowed_mentions": {
|
|
"parse": []
|
|
},
|
|
# SUPPRESS_EMBEDS
|
|
"flags": 1 << 2
|
|
}
|
|
|
|
|
|
def send_discord(content: str):
|
|
body = get_discord_body(content)
|
|
|
|
response = requests.post(DISCORD_WEBHOOK_URL, json=body)
|
|
response.raise_for_status()
|
|
|
|
|
|
def send_to_discord(entries: Iterable[ChangelogEntry]) -> None:
|
|
if not DISCORD_WEBHOOK_URL:
|
|
print(f"No discord webhook URL found, skipping discord send")
|
|
return
|
|
|
|
# Temporary until github publishing working to avoid spamming cl
|
|
return
|
|
|
|
message_content = io.StringIO()
|
|
# We need to manually split messages to avoid discord's character limit
|
|
# With that being said this isn't entirely robust
|
|
# e.g. a sufficiently large CL breaks it, but that's a future problem
|
|
|
|
for name, group in itertools.groupby(entries, lambda x: x["author"]):
|
|
# Need to split text to avoid discord character limit
|
|
group_content = io.StringIO()
|
|
group_content.write(f"**{name}** updated:\n")
|
|
|
|
for entry in group:
|
|
for change in entry["changes"]:
|
|
emoji = TYPES_TO_EMOJI.get(change['type'], "❓")
|
|
message = change['message']
|
|
url = entry.get("url")
|
|
if url and url.strip():
|
|
group_content.write(f"{emoji} [-]({url}) {message}\n")
|
|
else:
|
|
group_content.write(f"{emoji} - {message}\n")
|
|
|
|
group_text = group_content.getvalue()
|
|
message_text = message_content.getvalue()
|
|
message_length = len(message_text)
|
|
group_length = len(group_text)
|
|
|
|
# If adding the text would bring it over the group limit then send the message and start a new one
|
|
if message_length + group_length >= DISCORD_SPLIT_LIMIT:
|
|
print("Split changelog and sending to discord")
|
|
send_discord(message_text)
|
|
|
|
# Reset the message
|
|
message_content = io.StringIO()
|
|
|
|
# Flush the group to the message
|
|
message_content.write(group_text)
|
|
|
|
# Clean up anything remaining
|
|
message_text = message_content.getvalue()
|
|
if len(message_text) > 0:
|
|
print("Sending final changelog to discord")
|
|
send_discord(message_text)
|
|
|
|
|
|
main()
|