#!/usr/bin/env python3 import sys import os import json import requests ################################################################################ # Helper Functions ################################################################################ def get_envar(name): """ Given a name, return the corresponding environment variable. Exit if not defined, as using this function indicates the envar is required. Parameters: name (str): the name of the environment variable """ value = os.environ.get(name) if not value: sys.exit("%s is required for vsoch/pull-request-action" % name) return value def check_events_json(): """the github events json is required in order to indicate that we are in an action environment. """ events = get_envar("GITHUB_EVENT_PATH") if not os.path.exists(events): sys.exit("Cannot find Github events file at ${GITHUB_EVENT_PATH}") print("Found ${GITHUB_EVENT_PATH} at %s" % events) return events def abort_if_fail(response, reason): """If PASS_ON_ERROR, don't exit. Otherwise exit with an error and print the reason. Parameters: response (requests.Response) : an unparsed response from requests reason (str) : a message to print to the user for fail. """ message = "%s: %s: %s\n %s" % ( reason, response.status_code, response.reason, response.json(), ) if os.environ.get("PASS_ON_ERROR"): print("Error, but PASS_ON_ERROR is set, continuing: %s" % message) else: sys.exit(message) def parse_into_list(values): """A list of reviewers or assignees to parse from a string to a list Parameters: values (str) : a list of space separated, quoted values to parse to a list """ if values: values = values.replace('"', "").replace("'", "") if not values: return [] return [x.strip() for x in values.split(" ")] def set_env_and_output(name, value): """helper function to echo a key/value pair to the environement file Parameters: name (str) : the name of the environment variable value (str) : the value to write to file """ for env_var in ("GITHUB_ENV", "GITHUB_OUTPUT"): environment_file_path = os.environ.get(env_var) if not environment_file_path: print(f"Warning: {env_var} is unset, skipping.") continue print("Writing %s=%s to %s" % (name, value, env_var)) with open(environment_file_path, "a") as environment_file: environment_file.write("%s=%s\n" % (name, value)) def open_pull_request(title, body, target, source, is_draft=False, can_modify=True): """Open pull request opens a pull request with a given body and content, and sets output variables. An unparsed response is returned. Parameters: title (str) : the title to set for the new pull request body (str) : the body to set for the new pull request target (str) : the target branch source (str) : the source branch is_draft (bool) : indicate the pull request is a draft can_modify (bool) : indicate the maintainer can modify """ print("No pull request from %s to %s is open, continuing!" % (source, target)) # Post the pull request data = { "title": title, "body": body, "base": target, "head": source, "draft": is_draft, "maintainer_can_modify": can_modify, } print("Data for opening pull request: %s" % data) response = requests.post(PULLS_URL, json=data, headers=HEADERS) if response.status_code != 201: print(f"pull request url is {PULLS_URL}") abort_if_fail(response, "Unable to create pull request") return response def update_pull_request(entry, title, body, target, state=None): """Given an existing pull request, update it. Parameters: entry (dict) : the pull request metadata title (str) : the title to set for the new pull request body (str) : the body to set for the new pull request target (str) : the target branch state (bool) : the state of the PR (open, closed) """ print("PULL_REQUEST_UPDATE is set, updating existing pull request.") data = { "title": title, "body": body, "base": target, "state": state or "open", } # PATCH /repos/{owner}/{repo}/pulls/{pull_number} url = "%s/%s" % (PULLS_URL, entry.get("number")) print("Data for updating pull request: %s" % data) response = requests.patch(url, json=data, headers=HEADERS) if response.status_code != 200: abort_if_fail(response, "Unable to create pull request") return response def set_pull_request_groups(response): """Given a response for an open or updated PR, set metadata Parameters: response (requests.Response) : a requests response, unparsed """ # Expected return codes are 0 for success pull_request_return_code = ( 0 if response.status_code == 201 else response.status_code ) response = response.json() print("::group::github response") print(response) print("::endgroup::github response") number = response.get("number") html_url = response.get("html_url") print("Number opened for PR is %s" % number) set_env_and_output("PULL_REQUEST_NUMBER", number) set_env_and_output("PULL_REQUEST_RETURN_CODE", pull_request_return_code) set_env_and_output("PULL_REQUEST_URL", html_url) def list_pull_requests(target, source): """Given a target and source, return a list of pull requests that match (or simply exit given some kind of error code) Parameters: target (str) : the target branch source (str) : the source branch """ # Check if the branch already has a pull request open params = {"base": target, "head": source, "state": "open"} print("Params for checking if pull request exists: %s" % params) response = requests.get(PULLS_URL, params=params) # Case 1: 401, 404 might warrant needing a token if response.status_code in [401, 404]: response = requests.get(PULLS_URL, params=params, headers=HEADERS) if response.status_code != 200: abort_if_fail(response, "Unable to retrieve information about pull requests") return response.json() def add_assignees(entry, assignees): """Given a pull request metadata (from create or update) add assignees Parameters: entry (dict) : the pull request metadata assignees (str) : comma separated assignees string set by action """ # Remove leading and trailing quotes assignees = parse_into_list(assignees) number = entry.get("number") print( "Attempting to assign %s to pull request with number %s" % (assignees, number) ) # POST /repos/:owner/:repo/issues/:issue_number/assignees data = {"assignees": assignees} ASSIGNEES_URL = "%s/%s/assignees" % (ISSUE_URL, number) response = requests.post(ASSIGNEES_URL, json=data, headers=HEADERS) if response.status_code != 201: abort_if_fail(response, "Unable to create assignees") assignees_return_code = 0 if response.status_code == 201 else response.status_code print("::group::github assignees response") print(response.json()) print("::endgroup::github assignees response") set_env_and_output("ASSIGNEES_RETURN_CODE", assignees_return_code) def find_pull_request(listing, source): """Given a listing and a source, find a pull request based on the source (the branch name). Parameters: listing (list) : the list of PR objects (dict) to parse over source (str) : the source (head) branch to look for """ if listing: for entry in listing: if entry.get("head", {}).get("ref", "") == source: print("Pull request from %s is already open!" % source) return entry def find_default_branch(): """Find default branch for a repo (only called if branch not provided)""" response = requests.get(REPO_URL) # Case 1: 401, 404 might need a token if response.status_code in [401, 404]: response = requests.get(REPO_URL, headers=HEADERS) if response.status_code != 200: abort_if_fail(response, "Unable to retrieve default branch") default_branch = response.json()["default_branch"] print("Found default branch: %s" % default_branch) return default_branch def add_reviewers(entry, reviewers, team_reviewers): """Given regular or team reviewers, add them to a PR. Parameters: entry (dict) : the pull request metadata """ print("Found reviewers: %s and team reviewers: %s" % (reviewers, team_reviewers)) team_reviewers = parse_into_list(team_reviewers) reviewers = parse_into_list(reviewers) print("Parsed reviewers: %s and team reviewers: %s" % (reviewers, team_reviewers)) # POST /repos/:owner/:repo/pulls/:pull_number/requested_reviewers REVIEWERS_URL = "%s/%s/requested_reviewers" % (PULLS_URL, entry.get("number")) data = {"reviewers": reviewers, "team_reviewers": team_reviewers} response = requests.post(REVIEWERS_URL, json=data, headers=HEADERS) if response.status_code != 201: abort_if_fail(response, "Unable to assign reviewers") reviewers_return_code = 0 if response.status_code == 201 else response.status_code print("::group::github reviewers response") print(response.json()) print("::endgroup::github reviewers response") set_env_and_output("REVIEWERS_RETURN_CODE", reviewers_return_code) ################################################################################ # Global Variables (we can't use GITHUB_ prefix) ################################################################################ API_VERSION = "v3" # Allow for a GitHub enterprise URL BASE = os.environ.get("GITHUB_API_URL") or "https://api.github.com" PR_TOKEN = os.environ.get("PULL_REQUEST_TOKEN") or get_envar("GITHUB_TOKEN") PR_REPO = os.environ.get("PULL_REQUEST_REPOSITORY") or get_envar("GITHUB_REPOSITORY") HEADERS = { "Authorization": "token %s" % PR_TOKEN, "Accept": "application/vnd.github.%s+json;application/vnd.github.antiope-preview+json;application/vnd.github.shadow-cat-preview+json" % API_VERSION, } # URLs REPO_URL = "%s/repos/%s" % (BASE, PR_REPO) ISSUE_URL = "%s/issues" % REPO_URL PULLS_URL = "%s/pulls" % REPO_URL def create_pull_request( source, target, body, title, assignees, reviewers, team_reviewers, is_draft=False, can_modify=True, state="open", ): """Create pull request is the base function that determines if the PR exists, and then updates or creates it depending on user preferences. """ listing = list_pull_requests(target, source) # Determine if the pull request is already open entry = find_pull_request(listing, source) response = None # Case 1: we found the PR, the user wants to pass if entry and os.environ.get("PASS_IF_EXISTS"): print("PASS_IF_EXISTS is set, exiting with success status.") sys.exit(0) # Does the user want to update the existing PR? if entry and os.environ.get("PULL_REQUEST_UPDATE"): response = update_pull_request(entry, title, body, target, state) set_pull_request_groups(response) # If it's not open, we open a new pull request elif not entry: response = open_pull_request(title, body, target, source, is_draft, can_modify) set_pull_request_groups(response) # If we have a response, parse into json (no longer need retvals) response = response.json() if response else None # If we have opened or updated, we can add assignees if response and assignees: add_assignees(response, assignees) if response and (reviewers or team_reviewers): add_reviewers(response, reviewers, team_reviewers) def main(): """main primarily parses environment variables to prepare for creation""" # path to file that contains the POST response of the event # Example: https://github.com/actions/bin/tree/master/debug # Value: /github/workflow/event.json check_events_json() branch_prefix = os.environ.get("BRANCH_PREFIX", "") print("Branch prefix is %s" % branch_prefix) if not branch_prefix: print("No branch prefix is set, all branches will be used.") # Default to project default branch if none provided pull_request_branch = os.environ.get("PULL_REQUEST_BRANCH") if not pull_request_branch: pull_request_branch = find_default_branch() print("Pull requests will go to %s" % pull_request_branch) # Pull request draft pull_request_draft = os.environ.get("PULL_REQUEST_DRAFT") if not pull_request_draft: print("No explicit preference for draft PR: created PRs will be normal PRs.") pull_request_draft = False else: print("PULL_REQUEST_DRAFT set to a value: created PRs will be draft PRs.") pull_request_draft = True # If an update is true, we can change the state pull_request_state = os.environ.get("PULL_REQUEST_STATE", "open") if pull_request_state not in ["open", "closed"]: sys.exit("State is required to be one of 'open' or 'closed'") # Maintainer can modify, defaults to CAN, unless user sets MAINTAINER_CANT_MODIFY maintainer_can_modify = os.environ.get("MAINTAINER_CANT_MODIFY") if not maintainer_can_modify: print("No preference for maintainer being able to modify: default is true.") maintainer_can_modify = True else: print( "MAINTAINER_CANT_MODIFY set to a value: maintainer will not be able to modify." ) maintainer_can_modify = False # Assignees assignees = os.environ.get("PULL_REQUEST_ASSIGNEES") if not assignees: print("PULL_REQUEST_ASSIGNEES is not set, no assignees.") else: print("PULL_REQUEST_ASSIGNEES is set, %s" % assignees) # Reviewers (individual and team) reviewers = os.environ.get("PULL_REQUEST_REVIEWERS") team_reviewers = os.environ.get("PULL_REQUEST_TEAM_REVIEWERS") if not reviewers: print("PULL_REQUEST_REVIEWERS is not set, no reviewers.") else: print("PULL_REQUEST_REVIEWERS is set, %s" % reviewers) if not team_reviewers: print("PULL_REQUEST_TEAM_REVIEWERS is not set, no team reviewers.") else: print("PULL_REQUEST_TEAM_REVIEWERS is set, %s" % team_reviewers) # The user is allowed to explicitly set the name of the branch from_branch = os.environ.get("PULL_REQUEST_FROM_BRANCH") if not from_branch: print("PULL_REQUEST_FROM_BRANCH is not set, checking branch in payload.") with open(check_events_json(), "r") as fd: from_branch = json.loads(fd.read()).get("ref", "") from_branch = from_branch.replace("refs/heads/", "").strip("/") else: print("PULL_REQUEST_FROM_BRANCH is set.") # At this point, we must have a branch if from_branch: print("Found branch %s to open PR from" % from_branch) else: sys.exit( "You are required to define PULL_REQUEST_FROM_BRANCH in the environment." ) # If it's to the target branch, ignore it if from_branch == pull_request_branch: print("Target and current branch are identical (%s), skipping." % from_branch) sys.exit(0) # If the prefix for the branch matches if not branch_prefix or from_branch.startswith(branch_prefix): # Pull request body (optional) pull_request_body = os.environ.get( "PULL_REQUEST_BODY", "This is an automated pull request to update from branch %s" % from_branch, ) print("::group::pull request body") print(pull_request_body) print("::endgroup::pull request body") # Pull request title (optional) pull_request_title = os.environ.get( "PULL_REQUEST_TITLE", "Update from %s" % from_branch ) print("::group::pull request title") print(pull_request_title) print("::endgroup::pull request title") # Create the pull request create_pull_request( target=pull_request_branch, source=from_branch, body=pull_request_body, title=pull_request_title, is_draft=pull_request_draft, can_modify=maintainer_can_modify, assignees=assignees, reviewers=reviewers, team_reviewers=team_reviewers, state=pull_request_state, ) if __name__ == "__main__": print("==========================================================================") print("START: Running Pull Request on Branch Update Action!") main() print("==========================================================================") print("END: Finished")