Source code for ecs_files_composer.ecs_files_composer
# SPDX-License-Identifier: MPL-2.0
# Copyright 2020-2022 John Mille<john@compose-x.io>
"""Main module."""
from __future__ import annotations
import json
import os
from dataclasses import asdict
from enum import Enum
from os import environ, path
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, ByteString
import yaml
from dacite import Config, from_dict
from yaml import Loader
from ecs_files_composer import input
from ecs_files_composer.aws_mgmt import S3Fetcher
from ecs_files_composer.certificates_mgmt import process_x509_certs
from ecs_files_composer.common import LOG
from ecs_files_composer.files_mgmt import File
[docs]def init_config(
raw=None,
file_path=None,
env_var=None,
ssm_parameter=None,
s3_config=None,
secret_config=None,
role_arn=None,
external_id=None,
decode_base64=False,
context=None,
override_folder: str = None,
print_generated_config: bool = False,
):
"""Function to initialize the configuration as if it were a file itself"""
iam_override = {"SessionName": "FilesComposerInit"}
if ssm_parameter or s3_config or secret_config:
role_arn = environ.get("CONFIG_IAM_ROLE_ARN", role_arn)
external_id = environ.get("CONFIG_IAM_EXTERNAL_ID", external_id)
if role_arn:
iam_override.update({"RoleArn": role_arn})
if external_id:
iam_override.update({"ExternalId": external_id})
if ssm_parameter:
initial_config = {"source": {"Ssm": {"ParameterName": ssm_parameter}}}
elif s3_config:
if not S3Fetcher.bucket_re.match(s3_config):
raise ValueError(
"The value for S3 URI is not valid.",
s3_config,
"Expected to match",
S3Fetcher.bucket_re.pattern,
)
initial_config = {
"source": {
"S3": {
"BucketName": S3Fetcher.bucket_re.match(s3_config).group("bucket"),
"Key": S3Fetcher.bucket_re.match(s3_config).group("key"),
}
}
}
elif secret_config:
initial_config = {"source": {"Secret": {"SecretId": secret_config}}}
elif file_path:
with open(path.abspath(file_path)) as file_fd:
config_content = file_fd.read()
initial_config = {"content": config_content}
elif raw:
initial_config = {"content": raw}
elif env_var:
LOG.debug(f"Using env var {env_var}")
initial_config = {"content": environ.get(env_var, None)}
else:
raise ValueError("No input source was provided")
if not initial_config:
raise ImportError("Failed to import a configuration content")
LOG.debug(initial_config)
if not override_folder:
temp_dir = TemporaryDirectory()
config_path = f"{temp_dir.name}/init.conf"
else:
config_path = f"{override_folder}/init.conf"
jobs_input_def = {
"files": {config_path: initial_config},
"IamOverride": iam_override,
}
if decode_base64:
initial_config["encoding"] = "base64"
if context:
initial_config["context"] = context
start_jobs(jobs_input_def)
try:
with open(config_path) as config_fd:
_file_content = config_fd.read()
if print_generated_config:
LOG.info(_file_content)
try:
config = yaml.load(_file_content, Loader=Loader)
LOG.info(f"Successfully loaded YAML config {config_path}")
except yaml.YAMLError:
try:
config = json.loads(_file_content)
LOG.info(f"Successfully loaded JSON config {config_path}")
except json.JSONDecodeError:
LOG.error("Input content is not valid JSON")
raise
finally:
if print_generated_config:
LOG.info(jobs_input_def)
return config
except OSError as error:
LOG.exception(error)
LOG.error(f"Failed to read input file from {config_path}")
[docs]def process_files(job: input.Model, override_session=None) -> None:
files: list = []
for file_path, file in job.files.items():
if not isinstance(file, File):
file_redef = from_dict(
data_class=File, data=asdict(file), config=Config(cast=[Enum, bytes])
)
file_redef.path = file_path
files.append(file_redef)
else:
files.append(file)
for file in files:
file.handler(job.IamOverride, override_session)
LOG.info(f"Tasks for {file.path} completed.")
[docs]def start_jobs(config: dict, override_session=None):
"""Starting point to run the files job"""
job = from_dict(
data_class=input.Model, data=config, config=Config(cast=[Enum, bytes])
)
if job.certificates:
process_x509_certs(job)
if job.files:
process_files(job, override_session)