Fixes and refactoring to discord changelog script (#33859)

* Fixes and refactoring to discord changelog script

Upstreamed from https://github.com/impstation/imp-station-14/pull/1023

* Add some prints back in
This commit is contained in:
imcb
2025-03-11 19:14:40 +00:00
committed by GitHub
parent 0575a3afd6
commit 231847a36d

View File

@@ -1,21 +1,22 @@
#!/usr/bin/env python3
#
# Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
# Automatically figures out the last run and changelog contents with the GitHub API.
#
"""
Sends updates to a Discord webhook for new changelog entries since the last GitHub Actions publish run.
Automatically figures out the last run and changelog contents with the GitHub API.
"""
import io
import itertools
import os
import requests
import yaml
from pathlib import Path
from typing import Any, Iterable
GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
GITHUB_REPOSITORY = os.environ["GITHUB_REPOSITORY"]
GITHUB_RUN = os.environ["GITHUB_RUN_ID"]
GITHUB_TOKEN = os.environ["GITHUB_TOKEN"]
import requests
import yaml
DEBUG = False
DEBUG_CHANGELOG_FILE_OLD = Path("Resources/Changelog/Old.yml")
GITHUB_API_URL = os.environ.get("GITHUB_API_URL", "https://api.github.com")
# https://discord.com/developers/docs/resources/webhook
DISCORD_SPLIT_LIMIT = 2000
@@ -23,39 +24,40 @@ DISCORD_WEBHOOK_URL = os.environ.get("DISCORD_WEBHOOK_URL")
CHANGELOG_FILE = "Resources/Changelog/Changelog.yml"
TYPES_TO_EMOJI = {
"Fix": "🐛",
"Add": "🆕",
"Remove": "",
"Tweak": "⚒️"
}
TYPES_TO_EMOJI = {"Fix": "🐛", "Add": "🆕", "Remove": "", "Tweak": "⚒️"}
ChangelogEntry = dict[str, Any]
def main():
if not DISCORD_WEBHOOK_URL:
print("No discord webhook URL found, skipping discord send")
return
session = requests.Session()
session.headers["Authorization"] = f"Bearer {GITHUB_TOKEN}"
session.headers["Accept"] = "Accept: application/vnd.github+json"
session.headers["X-GitHub-Api-Version"] = "2022-11-28"
if DEBUG:
# to debug this script locally, you can use
# a separate local file as the old changelog
last_changelog_stream = DEBUG_CHANGELOG_FILE_OLD.read_text()
else:
# when running this normally in a GitHub actions workflow,
# it will get the old changelog from the GitHub API
last_changelog_stream = get_last_changelog()
most_recent = get_most_recent_workflow(session)
last_sha = most_recent['head_commit']['id']
print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
last_changelog = yaml.safe_load(get_last_changelog(session, last_sha))
last_changelog = yaml.safe_load(last_changelog_stream)
with open(CHANGELOG_FILE, "r") as f:
cur_changelog = yaml.safe_load(f)
diff = diff_changelog(last_changelog, cur_changelog)
send_to_discord(diff)
message_lines = changelog_entries_to_message_lines(diff)
send_message_lines(message_lines)
def get_most_recent_workflow(sess: requests.Session) -> Any:
workflow_run = get_current_run(sess)
def get_most_recent_workflow(
sess: requests.Session, github_repository: str, github_run: str
) -> Any:
workflow_run = get_current_run(sess, github_repository, github_run)
past_runs = get_past_runs(sess, workflow_run)
for run in past_runs['workflow_runs']:
for run in past_runs["workflow_runs"]:
# First past successful run that isn't our current run.
if run["id"] == workflow_run["id"]:
continue
@@ -63,8 +65,12 @@ def get_most_recent_workflow(sess: requests.Session) -> Any:
return run
def get_current_run(sess: requests.Session) -> Any:
resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/actions/runs/{GITHUB_RUN}")
def get_current_run(
sess: requests.Session, github_repository: str, github_run: str
) -> Any:
resp = sess.get(
f"{GITHUB_API_URL}/repos/{github_repository}/actions/runs/{github_run}"
)
resp.raise_for_status()
return resp.json()
@@ -73,32 +79,55 @@ def get_past_runs(sess: requests.Session, current_run: Any) -> Any:
"""
Get all successful workflow runs before our current one.
"""
params = {
"status": "success",
"created": f"<={current_run['created_at']}"
}
params = {"status": "success", "created": f"<={current_run['created_at']}"}
resp = sess.get(f"{current_run['workflow_url']}/runs", params=params)
resp.raise_for_status()
return resp.json()
def get_last_changelog(sess: requests.Session, sha: str) -> str:
def get_last_changelog() -> str:
github_repository = os.environ["GITHUB_REPOSITORY"]
github_run = os.environ["GITHUB_RUN_ID"]
github_token = os.environ["GITHUB_TOKEN"]
session = requests.Session()
session.headers["Authorization"] = f"Bearer {github_token}"
session.headers["Accept"] = "Accept: application/vnd.github+json"
session.headers["X-GitHub-Api-Version"] = "2022-11-28"
most_recent = get_most_recent_workflow(session, github_repository, github_run)
last_sha = most_recent["head_commit"]["id"]
print(f"Last successful publish job was {most_recent['id']}: {last_sha}")
last_changelog_stream = get_last_changelog_by_sha(
session, last_sha, github_repository
)
return last_changelog_stream
def get_last_changelog_by_sha(
sess: requests.Session, sha: str, github_repository: str
) -> str:
"""
Use GitHub API to get the previous version of the changelog YAML (Actions builds are fetched with a shallow clone)
"""
params = {
"ref": sha,
}
headers = {
"Accept": "application/vnd.github.raw"
}
headers = {"Accept": "application/vnd.github.raw"}
resp = sess.get(f"{GITHUB_API_URL}/repos/{GITHUB_REPOSITORY}/contents/{CHANGELOG_FILE}", headers=headers, params=params)
resp = sess.get(
f"{GITHUB_API_URL}/repos/{github_repository}/contents/{CHANGELOG_FILE}",
headers=headers,
params=params,
)
resp.raise_for_status()
return resp.text
def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[ChangelogEntry]:
def diff_changelog(
old: dict[str, Any], cur: dict[str, Any]
) -> Iterable[ChangelogEntry]:
"""
Find all new entries not present in the previous publish.
"""
@@ -108,69 +137,75 @@ def diff_changelog(old: dict[str, Any], cur: dict[str, Any]) -> Iterable[Changel
def get_discord_body(content: str):
return {
"content": content,
# Do not allow any mentions.
"allowed_mentions": {
"parse": []
},
# SUPPRESS_EMBEDS
"flags": 1 << 2
}
"content": content,
# Do not allow any mentions.
"allowed_mentions": {"parse": []},
# SUPPRESS_EMBEDS
"flags": 1 << 2,
}
def send_discord(content: str):
def send_discord_webhook(lines: list[str]):
content = "".join(lines)
body = get_discord_body(content)
response = requests.post(DISCORD_WEBHOOK_URL, json=body)
response.raise_for_status()
def send_to_discord(entries: Iterable[ChangelogEntry]) -> None:
if not DISCORD_WEBHOOK_URL:
print(f"No discord webhook URL found, skipping discord send")
return
def changelog_entries_to_message_lines(entries: Iterable[ChangelogEntry]) -> list[str]:
"""Process structured changelog entries into a list of lines making up a formatted message."""
message_lines = []
message_content = io.StringIO()
# We need to manually split messages to avoid discord's character limit
# With that being said this isn't entirely robust
# e.g. a sufficiently large CL breaks it, but that's a future problem
for name, group in itertools.groupby(entries, lambda x: x["author"]):
# Need to split text to avoid discord character limit
group_content = io.StringIO()
group_content.write(f"**{name}** updated:\n")
for contributor_name, group in itertools.groupby(entries, lambda x: x["author"]):
message_lines.append(f"**{contributor_name}** updated:\n")
for entry in group:
url = entry.get("url")
if url and not url.strip():
url = None
for change in entry["changes"]:
emoji = TYPES_TO_EMOJI.get(change['type'], "")
message = change['message']
url = entry.get("url")
if url and url.strip():
group_content.write(f"{emoji} - {message} [PR]({url}) \n")
emoji = TYPES_TO_EMOJI.get(change["type"], "")
message = change["message"]
# if a single line is longer than the limit, it needs to be truncated
if len(message) > DISCORD_SPLIT_LIMIT:
message = message[: DISCORD_SPLIT_LIMIT - 100].rstrip() + " [...]"
if url is not None:
line = f"{emoji} - {message} [PR]({url}) \n"
else:
group_content.write(f"{emoji} - {message}\n")
line = f"{emoji} - {message}\n"
group_text = group_content.getvalue()
message_text = message_content.getvalue()
message_length = len(message_text)
group_length = len(group_text)
message_lines.append(line)
# If adding the text would bring it over the group limit then send the message and start a new one
if message_length + group_length >= DISCORD_SPLIT_LIMIT:
print("Split changelog and sending to discord")
send_discord(message_text)
return message_lines
# Reset the message
message_content = io.StringIO()
# Flush the group to the message
message_content.write(group_text)
# Clean up anything remaining
message_text = message_content.getvalue()
if len(message_text) > 0:
def send_message_lines(message_lines: list[str]):
"""Join a list of message lines into chunks that are each below Discord's message length limit, and send them."""
chunk_lines = []
chunk_length = 0
for line in message_lines:
line_length = len(line)
new_chunk_length = chunk_length + line_length
if new_chunk_length > DISCORD_SPLIT_LIMIT:
print("Split changelog and sending to discord")
send_discord_webhook(chunk_lines)
new_chunk_length = line_length
chunk_lines.clear()
chunk_lines.append(line)
chunk_length = new_chunk_length
if chunk_lines:
print("Sending final changelog to discord")
send_discord(message_text)
send_discord_webhook(chunk_lines)
main()
if __name__ == "__main__":
main()