Files
coverage-comment-action/src/entrypoint
2021-09-25 23:47:58 +02:00

269 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
import dataclasses
import inspect
import json
import os
import pathlib
import subprocess
import sys
import tempfile
from typing import List, Optional
import github
import jinja2
import xmltodict
MARKER = """<!-- This comment was produced by coverage-comment-action -->"""
BADGE_FILENAME = "coverage-comment-badge.json"
MINIMUM_GREEN = 100
MINIMUM_ORANGE = 70
SHIELD_URL = "https://img.shields.io/endpoint?url={url}"
def main():
config = Config.from_environ(os.environ)
coverage_info = get_coverage_info(config=config)
gh = get_api(config=config)
if config.GITHUB_PR_NUMBER:
print(f"Commenting on the coverage on PR {config.GITHUB_PR_NUMBER}")
diff_coverage_info = get_diff_coverage_info(config=config)
previous_coverage_rate = get_previous_coverage_rate(config=config)
comment = get_markdown_comment(
coverage_info=coverage_info,
diff_coverage_info=diff_coverage_info,
previous_coverage_rate=previous_coverage_rate,
config=config,
)
post_comment(body=comment, gh=gh, config=config)
if config.BADGE_ENABLED and is_main_branch(gh=gh, config=config):
print("Running on default branch, saving Badge into the repo wiki")
badge = compute_badge(coverage_info=coverage_info)
url = upload_badge(badge=badge, config=config)
print(f"Badge JSON stored at {url}")
print(f"Badge URL: {SHIELD_URL.format(url=url)}")
@dataclasses.dataclass
class Config:
"""This object defines the environment variables"""
GITHUB_BASE_REF: str
GITHUB_TOKEN: str
GITHUB_REPOSITORY: str
GITHUB_HEAD_REF: str
GITHUB_REF: str
COVERAGE_FILE: str = "coverage.xml"
COMMENT_TEMPLATE: Optional[str] = None
DIFF_COVER_ARGS: List[str] = dataclasses.field(default_factory=list)
BADGE_ENABLED: bool = True
# Clean methods
@classmethod
def clean_diff_cover_args(cls, value: str) -> list:
return [e.strip() for e in value.split("\n") if e.strip()]
@classmethod
def clean_badge_enabled(cls, value: str) -> bool:
return value.lower() in ("1", "true", "yes")
@property
def GITHUB_PR_NUMBER(self) -> Optional[int]:
# "refs/pull/2/merge"
if "pull" in self.GITHUB_REF:
return int(self.GITHUB_REF.split("/")[2])
return None
@classmethod
def from_environ(cls, environ):
possible_variables = [e for e in inspect.signature(cls).parameters]
config = {k: v for k, v in environ.items() if k in possible_variables}
for key, value in list(config.items()):
if func := getattr(cls, f"clean_{key.lower()}", None):
config[key] = func(value)
try:
return cls(**config)
except TypeError:
missing = {
name
for name, param in inspect.signature(cls).parameters.items()
if param.default is inspect.Parameter.empty
} - set(os.environ)
sys.exit(f" missing environment variable(s): {', '.join(missing)}")
def get_api(config: Config) -> github.Github:
return github.Github(config.GITHUB_TOKEN)
def get_coverage_info(config: Config) -> dict:
coverage_file = pathlib.Path(config.COVERAGE_FILE)
if not coverage_file.exists():
raise Exit(f"Coverage file not found at {config.COVERAGE_FILE}")
def convert(tuple_values):
result = []
for key, value in tuple_values:
result.append(
(
key,
{
"@timestamp": int,
"@lines-valid": int,
"@lines-covered": int,
"@line-rate": float,
"@branches-valid": int,
"@branches-covered": int,
"@branch-rate": float,
"@complexity": int,
"@hits": int,
"@branch": lambda x: x == "true",
}.get(key, lambda x: x)(value),
)
)
return dict(result)
return json.loads(
json.dumps(xmltodict.parse(pathlib.Path(config.COVERAGE_FILE).read_text())),
object_pairs_hook=convert,
)["coverage"]
def get_diff_coverage_info(config: Config) -> dict:
call("git", "fetch", "--depth=1000")
with tempfile.NamedTemporaryFile("r") as f:
call(
"diff-cover",
config.COVERAGE_FILE,
f"--compare-branch=origin/{config.GITHUB_BASE_REF}",
f"--json-report={f.name}",
"--diff-range-notation=..",
"--quiet",
*config.DIFF_COVER_ARGS,
)
return json.loads(f.read())
def get_markdown_comment(
coverage_info: dict,
diff_coverage_info: dict,
previous_coverage_rate: Optional[float],
config: Config,
):
env = jinja2.Environment()
template = config.COMMENT_TEMPLATE or pathlib.Path("/var/default.md.j2").read_text()
previous_coverage = previous_coverage_rate * 100 if previous_coverage_rate else None
coverage = coverage_info["@line-rate"] * 100
branch_coverage = (
coverage_info["@branch-rate"] * 100
if coverage_info.get("@branch-rate")
else None
)
diff_coverage = diff_coverage_info["total_percent_covered"]
file_info = {
file: {"diff_coverage": stats["percent_covered"]}
for file, stats in diff_coverage_info["src_stats"].items()
}
return env.from_string(template).render(
previous_coverage=previous_coverage,
coverage=coverage,
branch_coverage=branch_coverage,
diff_coverage=diff_coverage,
file_info=file_info,
marker=MARKER,
)
def is_main_branch(gh: github.Github, config: Config) -> bool:
repo = gh.get_repo(config.GITHUB_REPOSITORY)
return repo.default_branch == config.GITHUB_HEAD_REF
def post_comment(body: str, gh: github.Github, config: Config) -> None:
try:
me = gh.get_user().login
except github.GithubException as exc:
if exc.status == 403:
me = "github-actions[bot]"
else:
raise
repo = gh.get_repo(config.GITHUB_REPOSITORY)
assert config.GITHUB_PR_NUMBER
issue = repo.get_issue(config.GITHUB_PR_NUMBER)
for comment in issue.get_comments():
if comment.user.login == me and MARKER in comment.body:
print("Update previous comment")
comment.edit(body=body)
break
else:
print("Adding new comment")
issue.create_comment(body=body)
def compute_badge(coverage_info: dict) -> str:
rate = int(coverage_info["@line-rate"] * 100)
if rate >= MINIMUM_GREEN:
color = "green"
elif rate >= MINIMUM_ORANGE:
color = "orange"
else:
color = "red"
badge = {
"schemaVersion": 1,
"label": "Coverage",
"message": f"{rate}%",
"color": color,
}
return json.dumps(badge)
def get_previous_coverage_rate(config: Config) -> Optional[float]:
return 0.42
def upload_badge(badge: str, config: Config) -> str:
try:
process = call(
"add-to-wiki",
config.GITHUB_REPOSITORY,
BADGE_FILENAME,
"Update badge",
input=badge,
)
except Exit as exc:
if "remote error: access denied or repository not exported" in str(exc):
print(
"Wiki seems not to be activated for this project. Please activate the "
"wiki. You may disable it afterwards."
)
raise
return process.stdout.strip()
class Exit(Exception):
pass
def call(*args, **kwargs):
try:
return subprocess.run(
args, text=True, check=True, capture_output=True, **kwargs
)
except subprocess.CalledProcessError as exc:
raise Exit("/n".join([exc.stdout, exc.stderr]))
if __name__ == "__main__":
try:
main()
except Exit as exc:
print(exc)
sys.exit(1)