#!/usr/bin/env python3 import dataclasses import inspect import json import os import pathlib import subprocess import sys import tempfile from typing import List, Optional import github import jinja2 import xmltodict MARKER = """""" BADGE_FILENAME = "coverage-comment-badge.json" MINIMUM_GREEN = 100 MINIMUM_ORANGE = 70 SHIELD_URL = "https://img.shields.io/endpoint?url={url}" def main(): config = Config.from_environ(os.environ) coverage_info = get_coverage_info(config=config) gh = get_api(config=config) if config.GITHUB_PR_NUMBER: print(f"Commenting on the coverage on PR {config.GITHUB_PR_NUMBER}") diff_coverage_info = get_diff_coverage_info(config=config) previous_coverage_rate = get_previous_coverage_rate(config=config) comment = get_markdown_comment( coverage_info=coverage_info, diff_coverage_info=diff_coverage_info, previous_coverage_rate=previous_coverage_rate, config=config, ) post_comment(body=comment, gh=gh, config=config) if config.BADGE_ENABLED and is_main_branch(gh=gh, config=config): print("Running on default branch, saving Badge into the repo wiki") badge = compute_badge(coverage_info=coverage_info) url = upload_badge(badge=badge, config=config) print(f"Badge JSON stored at {url}") print(f"Badge URL: {SHIELD_URL.format(url=url)}") @dataclasses.dataclass class Config: """This object defines the environment variables""" GITHUB_BASE_REF: str GITHUB_TOKEN: str GITHUB_REPOSITORY: str GITHUB_HEAD_REF: str GITHUB_REF: str COVERAGE_FILE: str = "coverage.xml" COMMENT_TEMPLATE: Optional[str] = None DIFF_COVER_ARGS: List[str] = dataclasses.field(default_factory=list) BADGE_ENABLED: bool = True # Clean methods @classmethod def clean_diff_cover_args(cls, value: str) -> list: return [e.strip() for e in value.split("\n") if e.strip()] @classmethod def clean_badge_enabled(cls, value: str) -> bool: return value.lower() in ("1", "true", "yes") @property def GITHUB_PR_NUMBER(self) -> Optional[int]: # "refs/pull/2/merge" if "pull" in self.GITHUB_REF: return int(self.GITHUB_REF.split("/")[2]) return None @classmethod def from_environ(cls, environ): possible_variables = [e for e in inspect.signature(cls).parameters] config = {k: v for k, v in environ.items() if k in possible_variables} for key, value in list(config.items()): if func := getattr(cls, f"clean_{key.lower()}", None): config[key] = func(value) try: return cls(**config) except TypeError: missing = { name for name, param in inspect.signature(cls).parameters.items() if param.default is inspect.Parameter.empty } - set(os.environ) sys.exit(f" missing environment variable(s): {', '.join(missing)}") def get_api(config: Config) -> github.Github: return github.Github(config.GITHUB_TOKEN) def get_coverage_info(config: Config) -> dict: coverage_file = pathlib.Path(config.COVERAGE_FILE) if not coverage_file.exists(): raise Exit(f"Coverage file not found at {config.COVERAGE_FILE}") def convert(tuple_values): result = [] for key, value in tuple_values: result.append( ( key, { "@timestamp": int, "@lines-valid": int, "@lines-covered": int, "@line-rate": float, "@branches-valid": int, "@branches-covered": int, "@branch-rate": float, "@complexity": int, "@hits": int, "@branch": lambda x: x == "true", }.get(key, lambda x: x)(value), ) ) return dict(result) return json.loads( json.dumps(xmltodict.parse(pathlib.Path(config.COVERAGE_FILE).read_text())), object_pairs_hook=convert, )["coverage"] def get_diff_coverage_info(config: Config) -> dict: call("git", "fetch", "--depth=1000") with tempfile.NamedTemporaryFile("r") as f: call( "diff-cover", config.COVERAGE_FILE, f"--compare-branch=origin/{config.GITHUB_BASE_REF}", f"--json-report={f.name}", "--diff-range-notation=..", "--quiet", *config.DIFF_COVER_ARGS, ) return json.loads(f.read()) def get_markdown_comment( coverage_info: dict, diff_coverage_info: dict, previous_coverage_rate: Optional[float], config: Config, ): env = jinja2.Environment() template = config.COMMENT_TEMPLATE or pathlib.Path("/var/default.md.j2").read_text() previous_coverage = previous_coverage_rate * 100 if previous_coverage_rate else None coverage = coverage_info["@line-rate"] * 100 branch_coverage = ( coverage_info["@branch-rate"] * 100 if coverage_info.get("@branch-rate") else None ) diff_coverage = diff_coverage_info["total_percent_covered"] file_info = { file: {"diff_coverage": stats["percent_covered"]} for file, stats in diff_coverage_info["src_stats"].items() } return env.from_string(template).render( previous_coverage=previous_coverage, coverage=coverage, branch_coverage=branch_coverage, diff_coverage=diff_coverage, file_info=file_info, marker=MARKER, ) def is_main_branch(gh: github.Github, config: Config) -> bool: repo = gh.get_repo(config.GITHUB_REPOSITORY) return repo.default_branch == config.GITHUB_HEAD_REF def post_comment(body: str, gh: github.Github, config: Config) -> None: try: me = gh.get_user().login except github.GithubException as exc: if exc.status == 403: me = "github-actions[bot]" else: raise repo = gh.get_repo(config.GITHUB_REPOSITORY) assert config.GITHUB_PR_NUMBER issue = repo.get_issue(config.GITHUB_PR_NUMBER) for comment in issue.get_comments(): if comment.user.login == me and MARKER in comment.body: print("Update previous comment") comment.edit(body=body) break else: print("Adding new comment") issue.create_comment(body=body) def compute_badge(coverage_info: dict) -> str: rate = int(coverage_info["@line-rate"] * 100) if rate >= MINIMUM_GREEN: color = "green" elif rate >= MINIMUM_ORANGE: color = "orange" else: color = "red" badge = { "schemaVersion": 1, "label": "Coverage", "message": f"{rate}%", "color": color, } return json.dumps(badge) def get_previous_coverage_rate(config: Config) -> Optional[float]: return 0.42 def upload_badge(badge: str, config: Config) -> str: try: process = call( "add-to-wiki", config.GITHUB_REPOSITORY, BADGE_FILENAME, "Update badge", input=badge, ) except Exit as exc: if "remote error: access denied or repository not exported" in str(exc): print( "Wiki seems not to be activated for this project. Please activate the " "wiki. You may disable it afterwards." ) raise return process.stdout.strip() class Exit(Exception): pass def call(*args, **kwargs): try: return subprocess.run( args, text=True, check=True, capture_output=True, **kwargs ) except subprocess.CalledProcessError as exc: raise Exit("/n".join([exc.stdout, exc.stderr])) if __name__ == "__main__": try: main() except Exit as exc: print(exc) sys.exit(1)