229 lines
7.2 KiB
Python
Executable File
229 lines
7.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
|
|
|
|
Automatically figures out the last run and changelog contents with the GitHub API.
|
|
"""
|
|
|
|
import itertools
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
import requests
|
|
import yaml
|
|
import time
|
|
|
|
DEBUG = False
|
|
DEBUG_CHANGELOG_FILE_OLD = Path("Resources/Changelog/Old.yml")
|
|
GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
|
|
|
|
# https://discord.com/developers/docs/resources/webhook
|
|
DISCORD_SPLIT_LIMIT = 2000
|
|
DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL")
|
|
|
|
CHANGELOG_FILE = "Resources/Changelog/Changelog.yml"
|
|
|
|
TYPES_TO_EMOJI = {"Fix": "🐛", "Add": "🆕", "Remove": "❌", "Tweak": "⚒️"}
|
|
|
|
ChangelogEntry = dict[str, Any]
|
|
|
|
|
|
def main():
|
|
if not DISCORD_WEBHOOK_URL:
|
|
print("No discord webhook URL found, skipping discord send")
|
|
return
|
|
|
|
if DEBUG:
|
|
# to debug this script locally, you can use
|
|
# a separate local file as the old changelog
|
|
last_changelog_stream = DEBUG_CHANGELOG_FILE_OLD.read_text()
|
|
else:
|
|
# when running this normally in a GitHub actions workflow,
|
|
# it will get the old changelog from the GitHub API
|
|
last_changelog_stream = get_last_changelog()
|
|
|
|
last_changelog = yaml.safe_load(last_changelog_stream)
|
|
with open(CHANGELOG_FILE, "r") as f:
|
|
cur_changelog = yaml.safe_load(f)
|
|
|
|
diff = diff_changelog(last_changelog, cur_changelog)
|
|
message_lines = changelog_entries_to_message_lines(diff)
|
|
send_message_lines(message_lines)
|
|
|
|
|
|
def get_most_recent_workflow(
|
|
sess: requests.Session, github_repository: str, github_run: str
|
|
) -> Any:
|
|
workflow_run = get_current_run(sess, github_repository, github_run)
|
|
past_runs = get_past_runs(sess, workflow_run)
|
|
for run in past_runs["workflow_runs"]:
|
|
# First past successful run that isn't our current run.
|
|
if run["id"] == workflow_run["id"]:
|
|
continue
|
|
|
|
return run
|
|
|
|
|
|
def get_current_run(
|
|
sess: requests.Session, github_repository: str, github_run: str
|
|
) -> Any:
|
|
resp = sess.get(
|
|
f"{GITHUB_API_URL}/repos/{github_repository}/actions/runs/{github_run}"
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def get_past_runs(sess: requests.Session, current_run: Any) -> Any:
|
|
"""
|
|
Get all successful workflow runs before our current one.
|
|
"""
|
|
params = {"status": "success", "created": f"<={current_run['created_at']}"}
|
|
resp = sess.get(f"{current_run['workflow_url']}/runs", params=params)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def get_last_changelog() -> str:
|
|
github_repository = os.environ["GITHUB_REPOSITORY"]
|
|
github_run = os.environ["GITHUB_RUN_ID"]
|
|
github_token = os.environ["GITHUB_TOKEN"]
|
|
|
|
session = requests.Session()
|
|
session.headers["Authorization"] = f"Bearer {github_token}"
|
|
session.headers["Accept"] = "Accept: application/vnd.github+json"
|
|
session.headers["X-GitHub-Api-Version"] = "2022-11-28"
|
|
|
|
most_recent = get_most_recent_workflow(session, github_repository, github_run)
|
|
last_sha = most_recent["head_commit"]["id"]
|
|
print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
|
|
last_changelog_stream = get_last_changelog_by_sha(
|
|
session, last_sha, github_repository
|
|
)
|
|
|
|
return last_changelog_stream
|
|
|
|
|
|
def get_last_changelog_by_sha(
|
|
sess: requests.Session, sha: str, github_repository: str
|
|
) -> str:
|
|
"""
|
|
Use GitHub API to get the previous version of the changelog YAML (Actions builds are fetched with a shallow clone)
|
|
"""
|
|
params = {
|
|
"ref": sha,
|
|
}
|
|
headers = {"Accept": "application/vnd.github.raw"}
|
|
|
|
resp = sess.get(
|
|
f"{GITHUB_API_URL}/repos/{github_repository}/contents/{CHANGELOG_FILE}",
|
|
headers=headers,
|
|
params=params,
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.text
|
|
|
|
|
|
def diff_changelog(
|
|
old: dict[str, Any], cur: dict[str, Any]
|
|
) -> Iterable[ChangelogEntry]:
|
|
"""
|
|
Find all new entries not present in the previous publish.
|
|
"""
|
|
old_entry_ids = {e["id"] for e in old["Entries"]}
|
|
return (e for e in cur["Entries"] if e["id"] not in old_entry_ids)
|
|
|
|
|
|
def get_discord_body(content: str):
|
|
return {
|
|
"content": content,
|
|
# Do not allow any mentions.
|
|
"allowed_mentions": {"parse": []},
|
|
# SUPPRESS_EMBEDS
|
|
"flags": 1 << 2,
|
|
}
|
|
|
|
|
|
def send_discord_webhook(lines: list[str]):
|
|
content = "".join(lines)
|
|
body = get_discord_body(content)
|
|
retry_attempt = 0
|
|
|
|
try:
|
|
response = requests.post(DISCORD_WEBHOOK_URL, json=body, timeout=10)
|
|
while response.status_code == 429:
|
|
retry_attempt += 1
|
|
if retry_attempt > 20:
|
|
print("Too many retries on a single request despite following retry_after header... giving up")
|
|
exit(1)
|
|
retry_after = response.json().get("retry_after", 5)
|
|
print(f"Rate limited, retrying after {retry_after} seconds")
|
|
time.sleep(retry_after)
|
|
response = requests.post(DISCORD_WEBHOOK_URL, json=body, timeout=10)
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to send message: {e}")
|
|
exit(1)
|
|
|
|
|
|
def changelog_entries_to_message_lines(entries: Iterable[ChangelogEntry]) -> list[str]:
|
|
"""Process structured changelog entries into a list of lines making up a formatted message."""
|
|
message_lines = []
|
|
|
|
for contributor_name, group in itertools.groupby(entries, lambda x: x["author"]):
|
|
message_lines.append("\n")
|
|
message_lines.append(f"**{contributor_name}** updated:\n")
|
|
|
|
for entry in group:
|
|
url = entry.get("url")
|
|
if url and not url.strip():
|
|
url = None
|
|
|
|
for change in entry["changes"]:
|
|
emoji = TYPES_TO_EMOJI.get(change["type"], "❓")
|
|
message = change["message"]
|
|
|
|
# if a single line is longer than the limit, it needs to be truncated
|
|
if len(message) > DISCORD_SPLIT_LIMIT:
|
|
message = message[: DISCORD_SPLIT_LIMIT - 100].rstrip() + " [...]"
|
|
|
|
if url is not None:
|
|
pr_number = url.split("/")[-1]
|
|
line = f"{emoji} - {message} ([#{pr_number}]({url}))\n"
|
|
else:
|
|
line = f"{emoji} - {message}\n"
|
|
|
|
message_lines.append(line)
|
|
|
|
return message_lines
|
|
|
|
|
|
def send_message_lines(message_lines: list[str]):
|
|
"""Join a list of message lines into chunks that are each below Discord's message length limit, and send them."""
|
|
chunk_lines = []
|
|
chunk_length = 0
|
|
|
|
for line in message_lines:
|
|
line_length = len(line)
|
|
new_chunk_length = chunk_length + line_length
|
|
|
|
if new_chunk_length > DISCORD_SPLIT_LIMIT:
|
|
print("Split changelog and sending to discord")
|
|
send_discord_webhook(chunk_lines)
|
|
|
|
new_chunk_length = line_length
|
|
chunk_lines.clear()
|
|
|
|
chunk_lines.append(line)
|
|
chunk_length = new_chunk_length
|
|
|
|
if chunk_lines:
|
|
print("Sending final changelog to discord")
|
|
send_discord_webhook(chunk_lines)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|