Source code for bitbucket_pipes_toolkit.core

import os
import glob
import json
from collections.abc import MutableSequence
import re
from urllib.parse import urlparse

import yaml
import requests
from cerberus import Validator
from requests.auth import HTTPBasicAuth, AuthBase
from yaml.scanner import ScannerError

from .helpers import fail, success, configure_logger


logger = configure_logger()


[docs]class ArrayVariable(MutableSequence): """ Example: >>> env = { ... 'EXTRA_ARGS_COUNT': 3, ... 'EXTRA_ARGS_0': '--description', ... 'EXTRA_ARGS_1': 'text containing spaces', ... 'EXTRA_ARGS_2': '--verbose'} >>> >>> array_variable = ArrayVariable('EXTRA_ARGS', env) >>> array_variable[2] '--verbose' """ def __init__(self, name, env): self._name = name self._count = int(env.get(f'{name}_COUNT', '0')) self._variables = [env.get(f'{name}_{i}') for i in range(self._count)] def __getitem__(self, i): return self._variables[i] def __setitem__(self, index, value): self._variables[index] = value def __delitem__(self, index): del self._variables[index]
[docs] def insert(self, index, value): self._variables.insert(index, value)
def __len__(self): return len(self._variables) @classmethod def from_list(cls, name, lst): array_variable = cls(name, {}) for var in lst: array_variable.append(var) return array_variable
[docs] def decompile(self): """ Example >>> var = ArrayVariable.from_list('TEST', ['foo','bar', 'baz']) >>> var.decompile() {'TEST_0': 'foo', 'TEST_1': 'bar', 'TEST_2': 'baz', 'TEST_COUNT': 3} """ dct = {f"{self._name}_{i}": value for i, value in enumerate(self._variables)} dct[f"{self._name}_COUNT"] = len(self) return dct
class SharedData: """This is an interface to the shared pipes data directory Example: >>> shared_data = SharedData('/tmp') >>> shared_data.get_variable('foo') is None True >>> shared_data.set_variable('foo', 'bar') >>> shared_data.get_variable('foo') 'bar' >>> shared_data.purge() Example storing complex data >>> shared_data = SharedData('/tmp') >>> shared_data.set_variable('my_data', {'my_variable': 'my_value'}) >>> shared_data.get_variable('my_data')['my_variable'] 'my_value' >>> shared_data.purge() """ def __init__(self, path): self._path = path self._shared_json_file = os.path.join(self._path, '_shared_variables.json') def set_variable(self, key, value): """Set the shared variable Args: key (str): Variable key value: The value of the variable """ try: with open(self._shared_json_file, 'r+') as storage: data = json.loads(storage.read()) data[key] = value storage.seek(0) storage.truncate() json.dump(data, storage) except FileNotFoundError: with open(self._shared_json_file, 'w+') as storage: data = {key: value} json.dump(data, storage) def get_variable(self, key): """Get the value of some variable Args: key (str): Variable key Returns: The value of the variable. """ try: with open(self._shared_json_file, 'r') as storage: data = json.load(storage) return data.get(key) except FileNotFoundError: return None def purge(self): """ Purge all shared data """ files = glob.glob(os.path.join(self._path, '_shared_*')) for f in files: os.remove(f) class TokenAuth(AuthBase): """Attaches auth token to the given Request object.""" def __init__(self, token): self.token = token def __call__(self, r): # modify and return the request r.headers['Authorization'] = 'Bearer ' + self.token return r
[docs]class Pipe: """Base class for all pipes. Provides utilities to work with configuration, validation etc. Attributes: variables (dict): Dictionary containing the pipes variables. schema (dict): Dictionary with the pipe parameters schema in the cerberus format. env (dict): Dict-like object containing pipe parameters. This is usually the environment variables that you get from os.environ Args: pipe_metadata (dict): Dictionary containing the pipe metadata pipe_metadata_file (str): Path to .yml file with metadata schema (dict): Schema for pipe variables. Pipe variables validation Pip variables validation is done at the time of initializing a pipe. The environment variables are validated against the schema provided. See https://docs.python-cerberus.org/en/stable/ for more details on how to specify schemas. """ def __init__(self, pipe_metadata=None, pipe_metadata_file=None, schema=None, env=None, logger=logger, check_for_newer_version=False): if pipe_metadata is not None and pipe_metadata_file is not None: self.fail(message="Passing both pipe_metadata and pipe_metadata_file is not allowed. " "Please use only one of them.") if pipe_metadata_file is not None: self.metadata = self.parse_yaml_file(filepath=pipe_metadata_file) elif pipe_metadata is not None: self.metadata = pipe_metadata else: self.metadata = {} if env is None: self.env = os.environ.copy() else: self.env = env self.variables = None self.schema = schema self.logger = logger # validate pipe parameters self.variables = self.validate() self.enable_debug_log_level() if check_for_newer_version: self.check_for_newer_version()
[docs] def fail(self, message, print_community_link=False): """Fail the pipe and exit. Args: message (str): Error message to show. print_community_link (bool): print or not. """ if print_community_link: community_link = self.get_community_link() community_link_message = f"If you have an issue with the pipe, let us know on Community:{community_link}" message = f"{message}\n{community_link_message}" fail(message=message)
[docs] @staticmethod def success(message, do_exit=False): """Show a success message. Args: message (str): Message to print do_exit (bool): Call sys.exit or not """ success(message, do_exit=do_exit)
[docs] def log_info(self, message): """Log an info message >>> pipe = Pipe(schema={}) >>> pipe.log_info('hello') """ return self.logger.info(message)
[docs] def log_error(self, message): """Log an error message >>> pipe = Pipe(schema={}) >>> pipe.log_error('hello') """ return self.logger.error(message)
[docs] def log_debug(self, message): """Log a debug message >>> pipe = Pipe(schema={}) >>> pipe.log_debug('hello') """ return self.logger.debug(message)
[docs] def log_warning(self, message): """Log a warning message >>> pipe = Pipe(schema={}) >>> pipe.log_warning('hello') """ return self.logger.warning(message)
[docs] def enable_debug_log_level(self): # pragma: no cover """Enable the DEBUG log level.""" if self.get_variable('DEBUG'): self.logger.setLevel('DEBUG')
def parse_yaml_file(self, filepath): try: with open(filepath) as yaml_file: content = yaml.safe_load(yaml_file) except FileNotFoundError: message = f'File {filepath} not found. Please give correct path to file.' self.fail(message=message) except yaml.YAMLError as exc: message = f"Failed to parse {filepath} file: {str(exc)}" self.fail(message=message) else: return content
[docs] def check_for_newer_version(self): """Check if a newer version is available and show a warning message >>> metadata = {'image': 'bitbucketpipelines/aws-ecs-deploy:0.0.3', ... 'repository': 'https://bitbucket.org/atlassian/aws-ecs-deploy'} >>> pipe = Pipe(pipe_metadata=metadata, schema={}) >>> pipe.check_for_newer_version() True """ image_string = self.metadata.get('image') if image_string is None: return False pattern = r':((?:[0-9]+\.?){3})(?:@.*$|$)' match = next(re.finditer(pattern, image_string), None) if match is None: self.log_warning(f"Could not parse current version for pipe's image {image_string}") return False current_version = match.group(1) official_pipes_repo = 'https://bitbucket.org/bitbucketpipelines/official-pipes' response = requests.get(f"{official_pipes_repo}/raw/master/pipes.prod.json") if not response.ok: self.log_warning(f'Failed to get the latest pipe version info. Error: {response.text}') return False repo_url = self.metadata.get('repository') repository_path = urlparse(repo_url).path.strip('/') released_pipe_info = next((pipe for pipe in response.json() if pipe['repositoryPath'] == repository_path), {}) latest_version = released_pipe_info.get('version') if not latest_version: self.log_warning(f'Could not find released pipe version for {repository_path}. ' f'Data matched: {released_pipe_info}') return False pipe_name = self.get_pipe_name() if tuple(map(int, latest_version.split('.'))) > tuple(map(int, current_version.split('.'))): self.log_warning(f"New version available: {pipe_name} '{current_version}' to '{latest_version}'") return True
[docs] def get_pipe_name(self): """Retrive a pipe's name from pipe's repository url. >>> metadata = {'image': 'bitbucketpipelines/aws-ecs-deploy:0.0.3', ... 'repository': 'https://bitbucket.org/atlassian/aws-ecs-deploy'} >>> pipe = Pipe(pipe_metadata=metadata, schema={}) >>> pipe.get_pipe_name() 'atlassian/aws-ecs-deploy' """ return urlparse(self.metadata.get('repository')).path[1:]
[docs] def validate(self): """Validates the environment variables against a provided schema. Variable schema is a dictionary in a cerberus format. See https://docs.python-cerberus.org/en/stable/ for more details about this library and validation rules. """ if self.schema is None: schema = self.metadata['variables'] else: schema = self.schema validator = Validator( schema=schema, purge_unknown=True) env = {} for key, value in self.env.items(): if key in schema: # NOTE: we accept string as this if schema[key].get('type', '') == 'string': env[key] = value continue try: env[key] = yaml.safe_load(value) except ScannerError: # keep a string value env[key] = value for key, key_schema in schema.items(): if key_schema.get('type') == 'list': env[key] = ArrayVariable(key, self.env) if not validator.validate(env): self.fail( message=f'Validation errors: \n{yaml.dump(validator.errors, default_flow_style = False)}') validated = validator.validated(env) return validated
[docs] def get_variable(self, name): """Retrieve a pipe variable. Args: name (str): The name of a variable. Returns: The value of the variable. """ return self.variables.get(name)
[docs] def resolve_auth(self): """Resolve authorization. Currently supported: - BITBUCKET_USERNAME and BITBUCKET_APP_PASSWORD or - BITBUCKET_ACCESS_TOKEN Returns <HTTPBasicAuth object> or <TokenAuth object> Raises Authentication missing ... SystemExit: 1 >>> pipe = Pipe(schema={}) >>> pipe.resolve_auth() Traceback (most recent call last): SystemExit: 1 >>> pipe = Pipe(schema={}) >>> pipe.variables = {'BITBUCKET_USERNAME': 'test', 'BITBUCKET_APP_PASSWORD': 'test-pass'} >>> pipe.resolve_auth() # doctest: +ELLIPSIS <...HTTPBasicAuth object at 0x...> >>> pipe = Pipe(schema={}) >>> pipe.variables = {'BITBUCKET_ACCESS_TOKEN': 'test-token'} >>> pipe.resolve_auth() # doctest: +ELLIPSIS <...TokenAuth object at 0x...> """ username = self.get_variable('BITBUCKET_USERNAME') password = self.get_variable('BITBUCKET_APP_PASSWORD') token = self.get_variable('BITBUCKET_ACCESS_TOKEN') if username and password: return HTTPBasicAuth(username, password) elif token: return TokenAuth(token) else: self.fail('Authentication missing. You must provide an access token or a username and app password.')
[docs] def run(self): """Run the pipe. The main entry point for a pipe execution. This will do all the setup steps, like enabling debug mode if configure etc. """ self.enable_debug_log_level()