291 lines
9.4 KiB
Python
291 lines
9.4 KiB
Python
# Copyright (c), Edoardo Tenani <e.tenani@arduino.cc>, 2018-2020
|
|
# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
__metaclass__ = type
|
|
|
|
|
|
import abc
|
|
import os
|
|
|
|
from ansible.module_utils import six
|
|
from ansible.module_utils.common.text.converters import to_text, to_native
|
|
|
|
# Since this is used both by plugins and modules, we need subprocess in case the `module` parameter is not used
|
|
from subprocess import Popen, PIPE
|
|
|
|
|
|
# From https://github.com/mozilla/sops/blob/master/cmd/sops/codes/codes.go
|
|
# Should be manually updated
|
|
SOPS_ERROR_CODES = {
|
|
1: "ErrorGeneric",
|
|
2: "CouldNotReadInputFile",
|
|
3: "CouldNotWriteOutputFile",
|
|
4: "ErrorDumpingTree",
|
|
5: "ErrorReadingConfig",
|
|
6: "ErrorInvalidKMSEncryptionContextFormat",
|
|
7: "ErrorInvalidSetFormat",
|
|
8: "ErrorConflictingParameters",
|
|
21: "ErrorEncryptingMac",
|
|
23: "ErrorEncryptingTree",
|
|
24: "ErrorDecryptingMac",
|
|
25: "ErrorDecryptingTree",
|
|
49: "CannotChangeKeysFromNonExistentFile",
|
|
51: "MacMismatch",
|
|
52: "MacNotFound",
|
|
61: "ConfigFileNotFound",
|
|
85: "KeyboardInterrupt",
|
|
91: "InvalidTreePathFormat",
|
|
100: "NoFileSpecified",
|
|
128: "CouldNotRetrieveKey",
|
|
111: "NoEncryptionKeyFound",
|
|
200: "FileHasNotBeenModified",
|
|
201: "NoEditorFound",
|
|
202: "FailedToCompareVersions",
|
|
203: "FileAlreadyEncrypted"
|
|
}
|
|
|
|
|
|
def _create_single_arg(argument_name):
|
|
def f(value, arguments, env):
|
|
arguments.extend([argument_name, to_native(value)])
|
|
|
|
return f
|
|
|
|
|
|
def _create_comma_separated(argument_name):
|
|
def f(value, arguments, env):
|
|
arguments.extend([argument_name, ','.join([to_native(v) for v in value])])
|
|
|
|
return f
|
|
|
|
|
|
def _create_repeated(argument_name):
|
|
def f(value, arguments, env):
|
|
for v in value:
|
|
arguments.extend([argument_name, to_native(v)])
|
|
|
|
return f
|
|
|
|
|
|
def _create_boolean(argument_name):
|
|
def f(value, arguments, env):
|
|
if value:
|
|
arguments.append(argument_name)
|
|
|
|
return f
|
|
|
|
|
|
def _create_env_variable(argument_name):
|
|
def f(value, arguments, env):
|
|
env[argument_name] = value
|
|
|
|
return f
|
|
|
|
|
|
GENERAL_OPTIONS = {
|
|
'aws_profile': _create_single_arg('--aws-profile'),
|
|
'aws_access_key_id': _create_env_variable('AWS_ACCESS_KEY_ID'),
|
|
'aws_secret_access_key': _create_env_variable('AWS_SECRET_ACCESS_KEY'),
|
|
'aws_session_token': _create_env_variable('AWS_SESSION_TOKEN'),
|
|
'config_path': _create_single_arg('--config'),
|
|
'enable_local_keyservice': _create_boolean('--enable-local-keyservice'),
|
|
'keyservice': _create_repeated('--keyservice'),
|
|
}
|
|
|
|
|
|
ENCRYPT_OPTIONS = {
|
|
'kms': _create_comma_separated('--kms'),
|
|
'gcp_kms': _create_comma_separated('--gcp-kms'),
|
|
'azure_kv': _create_comma_separated('--azure-kv'),
|
|
'hc_vault_transit': _create_comma_separated('--hc-vault-transit'),
|
|
'pgp': _create_comma_separated('--pgp'),
|
|
'unencrypted_suffix': _create_single_arg('--unencrypted-suffix'),
|
|
'encrypted_suffix': _create_single_arg('--encrypted-suffix'),
|
|
'unencrypted_regex': _create_single_arg('--unencrypted-regex'),
|
|
'encrypted_regex': _create_single_arg('--encrypted-regex'),
|
|
'encryption_context': _create_comma_separated('--encryption-context'),
|
|
'shamir_secret_sharing_threshold': _create_single_arg('--shamir-secret-sharing-threshold'),
|
|
}
|
|
|
|
|
|
class SopsError(Exception):
|
|
''' Extend Exception class with sops specific informations '''
|
|
|
|
def __init__(self, filename, exit_code, message, decryption=True):
|
|
if exit_code in SOPS_ERROR_CODES:
|
|
exception_name = SOPS_ERROR_CODES[exit_code]
|
|
message = "error with file %s: %s exited with code %d: %s" % (
|
|
filename, exception_name, exit_code, to_native(message))
|
|
else:
|
|
message = "could not %s file %s; Unknown sops error code: %s; message: %s" % (
|
|
'decrypt' if decryption else 'encrypt', filename, exit_code, to_native(message))
|
|
super(SopsError, self).__init__(message)
|
|
|
|
|
|
class Sops():
|
|
''' Utility class to perform sops CLI actions '''
|
|
|
|
@staticmethod
|
|
def _add_options(command, env, get_option_value, options):
|
|
if get_option_value is None:
|
|
return
|
|
for option, f in options.items():
|
|
v = get_option_value(option)
|
|
if v is not None:
|
|
f(v, command, env)
|
|
|
|
@staticmethod
|
|
def get_sops_binary(get_option_value):
|
|
cmd = get_option_value('sops_binary') if get_option_value else None
|
|
if cmd is None:
|
|
cmd = 'sops'
|
|
return cmd
|
|
|
|
@staticmethod
|
|
def decrypt(encrypted_file, content=None,
|
|
display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None):
|
|
# Run sops directly, python module is deprecated
|
|
command = [Sops.get_sops_binary(get_option_value)]
|
|
env = os.environ.copy()
|
|
Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
|
|
if input_type is not None:
|
|
command.extend(["--input-type", input_type])
|
|
if output_type is not None:
|
|
command.extend(["--output-type", output_type])
|
|
if content is not None:
|
|
encrypted_file = '/dev/stdin'
|
|
command.extend(["--decrypt", encrypted_file])
|
|
|
|
if module:
|
|
exit_code, output, err = module.run_command(command, environ_update=env, encoding=None, data=content, binary_data=True)
|
|
else:
|
|
process = Popen(command, stdin=None if content is None else PIPE, stdout=PIPE, stderr=PIPE, env=env)
|
|
(output, err) = process.communicate(input=content)
|
|
exit_code = process.returncode
|
|
|
|
if decode_output:
|
|
# output is binary, we want UTF-8 string
|
|
output = to_text(output, errors='surrogate_or_strict')
|
|
# the process output is the decrypted secret; be cautious
|
|
|
|
# sops logs always to stderr, as stdout is used for
|
|
# file content
|
|
if err and display:
|
|
display.vvvv(to_text(err, errors='surrogate_or_strict'))
|
|
|
|
if exit_code != 0:
|
|
raise SopsError(encrypted_file, exit_code, err, decryption=True)
|
|
|
|
if rstrip:
|
|
output = output.rstrip()
|
|
|
|
return output
|
|
|
|
@staticmethod
|
|
def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None):
|
|
# Run sops directly, python module is deprecated
|
|
command = [Sops.get_sops_binary(get_option_value)]
|
|
env = os.environ.copy()
|
|
Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS)
|
|
Sops._add_options(command, env, get_option_value, ENCRYPT_OPTIONS)
|
|
if input_type is not None:
|
|
command.extend(["--input-type", input_type])
|
|
if output_type is not None:
|
|
command.extend(["--output-type", output_type])
|
|
command.extend(["--encrypt", "/dev/stdin"])
|
|
|
|
if module:
|
|
exit_code, output, err = module.run_command(command, data=data, binary_data=True, cwd=cwd, environ_update=env, encoding=None)
|
|
else:
|
|
process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env)
|
|
(output, err) = process.communicate(input=data)
|
|
exit_code = process.returncode
|
|
|
|
# sops logs always to stderr, as stdout is used for
|
|
# file content
|
|
if err and display:
|
|
display.vvvv(to_text(err, errors='surrogate_or_strict'))
|
|
|
|
if exit_code != 0:
|
|
raise SopsError('to stdout', exit_code, err, decryption=False)
|
|
|
|
return output
|
|
|
|
|
|
def get_sops_argument_spec(add_encrypt_specific=False):
|
|
argument_spec = {
|
|
'sops_binary': {
|
|
'type': 'path',
|
|
},
|
|
'aws_profile': {
|
|
'type': 'str',
|
|
},
|
|
'aws_access_key_id': {
|
|
'type': 'str',
|
|
},
|
|
'aws_secret_access_key': {
|
|
'type': 'str',
|
|
'no_log': True,
|
|
},
|
|
'aws_session_token': {
|
|
'type': 'str',
|
|
'no_log': True,
|
|
},
|
|
'config_path': {
|
|
'type': 'path',
|
|
},
|
|
'enable_local_keyservice': {
|
|
'type': 'bool',
|
|
'default': False,
|
|
},
|
|
'keyservice': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
}
|
|
if add_encrypt_specific:
|
|
argument_spec.update({
|
|
'kms': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
'gcp_kms': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
'azure_kv': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
'hc_vault_transit': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
'pgp': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
'unencrypted_suffix': {
|
|
'type': 'str',
|
|
},
|
|
'encrypted_suffix': {
|
|
'type': 'str',
|
|
},
|
|
'unencrypted_regex': {
|
|
'type': 'str',
|
|
},
|
|
'encrypted_regex': {
|
|
'type': 'str',
|
|
},
|
|
'encryption_context': {
|
|
'type': 'list',
|
|
'elements': 'str',
|
|
},
|
|
'shamir_secret_sharing_threshold': {
|
|
'type': 'int',
|
|
'no_log': False,
|
|
},
|
|
})
|
|
return argument_spec
|