Source code for ecs_files_composer.aws_mgmt

#  -*- coding: utf-8 -*-
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2021 John Mille<john@compose-x.io>

"""AWS module."""

import re

import boto3
from boto3 import session
from botocore.exceptions import ClientError

from ecs_files_composer import input
from ecs_files_composer.common import LOG
from ecs_files_composer.envsubst import expandvars


[docs]def create_session_from_creds(tmp_creds, region=None): """ Function to easily convert the AssumeRole reply into a boto3 session :param tmp_creds: :return: :rtype boto3.session.Session """ creds = tmp_creds["Credentials"] params = { "aws_access_key_id": creds["AccessKeyId"], "aws_secret_access_key": creds["SecretAccessKey"], "aws_session_token": creds["SessionToken"], } if region: params["region_name"] = region return boto3.session.Session(**params)
[docs]def set_session_from_iam_object(iam_config_object, source_session=None): """ Function to define the client session based on config input :param ecs_files_composer.input.IamOverrideDef iam_config_object: :param source_session: :return: boto session :rtype: boto3.session.Session """ if source_session is None: source_session = boto3.session.Session() if not iam_config_object.access_key_id and not iam_config_object.secret_access_key: params = { "RoleArn": iam_config_object.role_arn, "RoleSessionName": f"{iam_config_object.session_name}@AwsResourceHandlerInit", } if iam_config_object.external_id: params["ExternalId"] = iam_config_object.external_id print("PA", params) tmp_creds = source_session.client("sts").assume_role(**params) client_session = create_session_from_creds( tmp_creds, region=iam_config_object.region_name ) else: client_session = boto3.session.Session( aws_access_key_id=iam_config_object.access_key_id, aws_secret_access_key=iam_config_object.secret_access_key, aws_session_token=iam_config_object.session_token if iam_config_object.session_token else None, ) return client_session
[docs]class AwsResourceHandler(object): """ Class to handle all AWS related credentials init. """ def __init__( self, role_arn=None, external_id=None, region=None, iam_config_object=None, client_session_override=None, ): """ :param str role_arn: :param str external_id: :param str region: :param ecs_files_composer.input.IamOverrideDef iam_config_object: """ self.session = session.Session() self.client_session = session.Session() if client_session_override: self.client_session = client_session_override elif not client_session_override and (role_arn or iam_config_object): if role_arn and not iam_config_object: params = { "RoleArn": role_arn, "RoleSessionName": "EcsConfigComposer@AwsResourceHandlerInit", } if external_id: params["ExternalId"] = external_id tmp_creds = self.session.client("sts").assume_role(**params) self.client_session = create_session_from_creds( tmp_creds, region=region ) elif ( iam_config_object and hasattr(iam_config_object, "role_arn") and iam_config_object.role_arn ): print(iam_config_object) self.client_session = set_session_from_iam_object( iam_config_object, self.session )
[docs]class S3Fetcher(AwsResourceHandler): """ Class to handle S3 actions """ bucket_re = re.compile(r"(?:s3://)(?P<bucket>[a-z0-9-.]+)/(?P<key>[\S]+$)") def __init__( self, role_arn=None, external_id=None, region=None, iam_config_object=None, client_session_override=None, ): super().__init__( role_arn, external_id, region, iam_config_object, client_session_override ) self.client = self.client_session.client("s3")
[docs] def get_content(self, s3_uri=None, s3_bucket=None, s3_key=None): """ Retrieves a file in a temp dir and returns content :param str s3_uri: :param str s3_bucket: :param str s3_key: :return: The Stream Body for the file, allowing to do various things """ if s3_uri and self.bucket_re.match(s3_uri).groups(): s3_bucket = self.bucket_re.match(s3_uri).group("bucket") s3_key = self.bucket_re.match(s3_uri).group("key") try: file_r = self.client.get_object(Bucket=s3_bucket, Key=s3_key) file_content = file_r["Body"] return file_content except self.client.exceptions.NoSuchKey: LOG.error(f"Failed to download the file {s3_key} from bucket {s3_bucket}") raise
[docs]class SsmFetcher(AwsResourceHandler): """ Class to handle SSM actions """ arn_re = re.compile( r"(?:^arn:aws(?:-[a-z]+)?:ssm:[\S]+:[0-9]+:parameter)(?P<name>/[\S]+)$" ) def __init__( self, role_arn=None, external_id=None, region=None, iam_config_object=None, client_session_override=None, ): super().__init__( role_arn, external_id, region, iam_config_object, client_session_override ) self.client = self.client_session.client("ssm")
[docs] def get_content(self, parameter_name): """ Import the Content of a given parameter If the parameter name is a valid ARN, parses and uses the name from ARN :param parameter_name: :return: """ if self.arn_re.match(parameter_name): parameter_name = self.arn_re.match(parameter_name).group("name") try: parameter = self.client.get_parameter( Name=parameter_name, WithDecryption=True ) return parameter["Parameter"]["Value"] except self.client.exceptions: raise except ClientError: raise
[docs]class SecretFetcher(AwsResourceHandler): """ Class to handle Secret Manager actions """ def __init__( self, role_arn=None, external_id=None, region=None, iam_config_object=None, client_session_override=None, ): super().__init__( role_arn, external_id, region, iam_config_object, client_session_override ) self.client = self.client_session.client("secretsmanager")
[docs] def get_content(self, secret): """ Import the Content of a given parameter :param input.SecretDef secret: :return: """ secret_id = expandvars(secret.secret_id) params = {"SecretId": secret_id} LOG.debug(f"Retrieving secretsmanager://{secret_id}") if secret.version_id: params["VersionId"] = secret.version_id if secret.version_stage: params["VersionStage"] = secret.version_stage try: parameter = self.client.get_secret_value(**params) return parameter["SecretString"] except self.client.exceptions: raise except ClientError: raise