Source code for bitbucket_pipes_toolkit.core

import os
import glob
import json
from import MutableSequence
import re
from urllib.parse import urlparse

import yaml
import requests
from cerberus import Validator

from .helpers import fail, success, configure_logger

logger = configure_logger()

[docs]class ArrayVariable(MutableSequence): """ Example: >>> env = { ... 'EXTRA_ARGS_COUNT': 3, ... 'EXTRA_ARGS_0': '--description', ... 'EXTRA_ARGS_1': 'text containing spaces', ... 'EXTRA_ARGS_2': '--verbose'} >>> >>> array_variable = ArrayVariable('EXTRA_ARGS', env) >>> array_variable[2] '--verbose' """ def __init__(self, name, env): self._name = name self._count = int(env.get(f'{name}_COUNT', '0')) self._variables = [env.get(f'{name}_{i}') for i in range(self._count)] def __getitem__(self, i): return self._variables[i] def __setitem__(self, index, value): self._variables[index] = value def __delitem__(self, index): del self._variables[index]
[docs] def insert(self, index, value): self._variables.insert(index, value)
def __len__(self): return len(self._variables) @classmethod def from_list(cls, name, lst): array_variable = cls(name, {}) for var in lst: array_variable.append(var) return array_variable
[docs] def decompile(self): """ Example >>> var = ArrayVariable.from_list('TEST', ['foo','bar', 'baz']) >>> var.decompile() {'TEST_0': 'foo', 'TEST_1': 'bar', 'TEST_2': 'baz', 'TEST_COUNT': 3} """ dct = {f"{self._name}_{i}": value for i, value in enumerate(self._variables)} dct[f"{self._name}_COUNT"] = len(self) return dct
class SharedData: """This is an interface to the shared pipes data directory Example: >>> shared_data = SharedData('/tmp') >>> shared_data.get_variable('foo') is None True >>> shared_data.set_variable('foo', 'bar') >>> shared_data.get_variable('foo') 'bar' >>> shared_data.purge() Example storing complex data >>> shared_data = SharedData('/tmp') >>> shared_data.set_variable('my_data', {'my_variable': 'my_value'}) >>> shared_data.get_variable('my_data')['my_variable'] 'my_value' >>> shared_data.purge() """ def __init__(self, path): self._path = path self._shared_json_file = os.path.join(self._path, '_shared_variables.json') def set_variable(self, key, value): """Set the shared variable Args: key (str): Variable key value: The value of the variable """ try: with open(self._shared_json_file, 'r+') as storage: data = json.loads( data[key] = value storage.truncate() json.dump(data, storage) except FileNotFoundError: with open(self._shared_json_file, 'w+') as storage: data = {key: value} json.dump(data, storage) def get_variable(self, key): """Get the value of some variable Args: key (str): Variable key Returns: The value of the variable. """ try: with open(self._shared_json_file, 'r') as storage: data = json.load(storage) return data.get(key) except FileNotFoundError: return None def purge(self): """ Purge all shared data """ files = glob.glob(os.path.join(self._path, '_shared_*')) for f in files: os.remove(f)
[docs]class Pipe: """Base class for all pipes. Provides utilities to work with configuration, validation etc. Attributes: variables (dict): Dictionary containing the pipes variables. schema (dict): Dictionary with the pipe parameters schema in the cerberus format. env (dict): Dict-like object containing pipe parameters. This is usually the environment variables that you get from os.environ Args: pipe_metadata (dict): Dictionary containing the pipe metadata pipe_metadata_file (str): Path to .yml file with metadata schema (dict): Schema for pipe variables. Pipe variables validation Pip variables validation is done at the time of initializing a pipe. The environment variables are validated against the schema provided. See for more details on how to specify schemas. """
[docs] def fail(self, message, print_community_link=False): """Fail the pipe and exit. Args: message (str): Error message to show. print_community_link (bool): print or not. """ if print_community_link: community_link = self.get_community_link() community_link_message = f"If you have an issue with the pipe, let us know on Community:{community_link}" message = f"{message}\n{community_link_message}" fail(message=message)
[docs] @staticmethod def success(message, do_exit=False): """Show a success message. Args: message (str): Message to print do_exit (bool): Call sys.exit or not """ success(message, do_exit=do_exit)
[docs] def log_info(self, message): """Log an info message >>> pipe = Pipe(schema={}) >>> pipe.log_info('hello') """ return
[docs] def log_error(self, message): """Log an error message >>> pipe = Pipe(schema={}) >>> pipe.log_error('hello') """ return self.logger.error(message)
[docs] def log_debug(self, message): """Log a debug message >>> pipe = Pipe(schema={}) >>> pipe.log_debug('hello') """ return self.logger.debug(message)
[docs] def log_warning(self, message): """Log a warning message >>> pipe = Pipe(schema={}) >>> pipe.log_warning('hello') """ return self.logger.warning(message)
[docs] def enable_debug_log_level(self): # pragma: no cover """Enable the DEBUG log level.""" if self.get_variable('DEBUG'): self.logger.setLevel('DEBUG')
def parse_yaml_file(self, filepath): try: with open(filepath) as yaml_file: content = yaml.safe_load(yaml_file) except FileNotFoundError: message = f'File {filepath} not found. Please give correct path to file.' except yaml.YAMLError as exc: message = f"Failed to parse {filepath} file: {str(exc)}" else: return content def __init__(self, pipe_metadata=None, pipe_metadata_file=None, schema=None, env=None, logger=logger, check_for_newer_version=False): if pipe_metadata is not None and pipe_metadata_file is not None:"Passing both pipe_metadata and pipe_metadata_file is not allowed. " "Please use only one of them.") if pipe_metadata_file is not None: self.metadata = self.parse_yaml_file(filepath=pipe_metadata_file) elif pipe_metadata is not None: self.metadata = pipe_metadata else: self.metadata = {} if env is None: self.env = os.environ.copy() else: self.env = env self.variables = None self.schema = schema self.logger = logger # validate pipe parameters self.variables = self.validate() self.enable_debug_log_level() if check_for_newer_version: self.check_for_newer_version()
[docs] def check_for_newer_version(self): """Check if a newer version is available and show a warning message >>> metadata = {'image': 'bitbucketpipelines/aws-ecs-deploy:0.0.3', ... 'repository': ''} >>> pipe = Pipe(pipe_metadata=metadata, schema={}) >>> pipe.check_for_newer_version() True """ image_string = self.metadata.get('image') if image_string is None: return False pattern = r':((?:[0-9]+\.?){3})(?:@.*$|$)' match = next(re.finditer(pattern, image_string), None) if match is None: self.log_warning(f"Could not parse current version for pipe's image {image_string}") return False current_version = official_pipes_repo = '' response = requests.get(f"{official_pipes_repo}/raw/master/") if not response.ok: self.log_warning(f'Failed to get the latest pipe version info. Error: {response.text}') return False repo_url = self.metadata.get('repository') repository_path = urlparse(repo_url).path.strip('/') released_pipe_info = next((pipe for pipe in response.json() if pipe['repositoryPath'] == repository_path), {}) latest_version = released_pipe_info.get('version') if not latest_version: self.log_warning(f'Could not find released pipe version for {repository_path}. ' f'Data matched: {released_pipe_info}') return False pipe_name = self.get_pipe_name() if tuple(map(int, latest_version.split('.'))) > tuple(map(int, current_version.split('.'))): self.log_warning(f"New version available: {pipe_name} '{current_version}' to '{latest_version}'") return True
[docs] def get_pipe_name(self): """Retrive a pipe's name from pipe's repository url. >>> metadata = {'image': 'bitbucketpipelines/aws-ecs-deploy:0.0.3', ... 'repository': ''} >>> pipe = Pipe(pipe_metadata=metadata, schema={}) >>> pipe.get_pipe_name() 'atlassian/aws-ecs-deploy' """ return urlparse(self.metadata.get('repository')).path[1:]
[docs] def validate(self): """Validates the environment variables against a provided schema. Variable schema is a dictionary in a cerberus format. See for more details about this library and validation rules. """ if self.schema is None: schema = self.metadata['variables'] else: schema = self.schema validator = Validator( schema=schema, purge_unknown=True) env = {} for key, value in self.env.items(): if key in schema: # NOTE: we accept string as this if schema[key].get('type', '') == 'string': env[key] = value continue try: env[key] = yaml.safe_load(value) except yaml.scanner.ScannerError: # keep a string value env[key] = value for key, key_schema in schema.items(): if key_schema.get('type') == 'list': env[key] = ArrayVariable(key, self.env) if not validator.validate(env): message=f'Validation errors: \n{yaml.dump(validator.errors, default_flow_style = False)}') validated = validator.validated(env) return validated
[docs] def get_variable(self, name): """Retrieve a pipe variable. Args: name (str): The name of a variable. Returns: The value of the variable. """ return self.variables.get(name)
[docs] def run(self): """Run the pipe. The main entry point for a pipe execution. This will do all the setup steps, like enabling debug mode if configure etc. """ self.enable_debug_log_level()