collection 교체

This commit is contained in:
정훈 변
2024-02-23 16:37:40 +09:00
parent b494779b5b
commit 3fd554eee9
38862 changed files with 220204 additions and 6600073 deletions

View File

@@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2018, Scott Buchanan <sbuchanan@ri.pn>
# Copyright: (c) 2016, Andrew Zenk <azenk@umn.edu> (lastpass.py used as starting point)
# Copyright: (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Copyright (c) 2018, Scott Buchanan <scott@buchanan.works>
# Copyright (c) 2016, Andrew Zenk <azenk@umn.edu> (lastpass.py used as starting point)
# Copyright (c) 2018, Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
@@ -13,48 +14,28 @@ DOCUMENTATION = '''
- Scott Buchanan (@scottsb)
- Andrew Zenk (@azenk)
- Sam Doran (@samdoran)
requirements:
- C(op) 1Password command line utility. See U(https://support.1password.com/command-line/)
short_description: fetch field values from 1Password
short_description: Fetch field values from 1Password
description:
- C(onepassword) wraps the C(op) command line utility to fetch specific field values from 1Password.
- P(community.general.onepassword#lookup) wraps the C(op) command line utility to fetch specific field values from 1Password.
requirements:
- C(op) 1Password command line utility
options:
_terms:
description: identifier(s) (UUID, name, or subdomain; case-insensitive) of item(s) to retrieve.
required: True
field:
description: field to return from each matching item (case-insensitive).
default: 'password'
master_password:
description: The password used to unlock the specified vault.
aliases: ['vault_password']
section:
description: Item section containing the field to retrieve (case-insensitive). If absent will return first match from any section.
description: Identifier(s) (case-insensitive UUID or name) of item(s) to retrieve.
required: true
account_id:
version_added: 7.5.0
domain:
description: Domain of 1Password. Default is U(1password.com).
version_added: 3.2.0
default: '1password.com'
field:
description: Field to return from each matching item (case-insensitive).
default: 'password'
type: str
subdomain:
description: The 1Password subdomain to authenticate against.
username:
description: The username used to sign in.
secret_key:
description: The secret key used when performing an initial sign in.
vault:
description: Vault containing the item to retrieve (case-insensitive). If absent will search all vaults.
notes:
- This lookup will use an existing 1Password session if one exists. If not, and you have already
performed an initial sign in (meaning C(~/.op/config exists)), then only the C(master_password) is required.
You may optionally specify C(subdomain) in this scenario, otherwise the last used subdomain will be used by C(op).
- This lookup can perform an initial login by providing C(subdomain), C(username), C(secret_key), and C(master_password).
- Due to the B(very) sensitive nature of these credentials, it is B(highly) recommended that you only pass in the minimal credentials
needed at any given time. Also, store these credentials in an Ansible Vault using a key that is equal to or greater in strength
to the 1Password master password.
- This lookup stores potentially sensitive data from 1Password as Ansible facts.
Facts are subject to caching if enabled, which means this data could be stored in clear text
on disk or in a database.
- Tested with C(op) version 0.5.3
service_account_token:
version_added: 7.1.0
extends_documentation_fragment:
- community.general.onepassword
- community.general.onepassword.lookup
'''
EXAMPLES = """
@@ -73,126 +54,174 @@ EXAMPLES = """
- name: Retrieve password for HAL when not signed in to 1Password
ansible.builtin.debug:
var: lookup('community.general.onepassword'
'HAL 9000'
subdomain='Discovery'
var: lookup('community.general.onepassword',
'HAL 9000',
subdomain='Discovery',
master_password=vault_master_password)
- name: Retrieve password for HAL when never signed in to 1Password
ansible.builtin.debug:
var: lookup('community.general.onepassword'
'HAL 9000'
subdomain='Discovery'
master_password=vault_master_password
username='tweety@acme.com'
var: lookup('community.general.onepassword',
'HAL 9000',
subdomain='Discovery',
master_password=vault_master_password,
username='tweety@acme.com',
secret_key=vault_secret_key)
- name: Retrieve password from specific account
ansible.builtin.debug:
var: lookup('community.general.onepassword',
'HAL 9000',
account_id='abc123')
"""
RETURN = """
_raw:
description: field data requested
description: Field data requested.
type: list
elements: str
"""
import errno
import json
import abc
import os
from subprocess import Popen, PIPE
import json
import subprocess
from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleLookupError
from ansible.errors import AnsibleLookupError, AnsibleOptionsError
from ansible.module_utils.common.process import get_bin_path
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.module_utils.six import with_metaclass
from ansible_collections.community.general.plugins.module_utils.onepassword import OnePasswordConfig
class OnePass(object):
def _lower_if_possible(value):
"""Return the lower case version value, otherwise return the value"""
try:
return value.lower()
except AttributeError:
return value
def __init__(self, path='op'):
self.cli_path = path
self.config_file_path = os.path.expanduser('~/.op/config')
self.logged_in = False
self.token = None
self.subdomain = None
self.domain = None
self.username = None
self.secret_key = None
self.master_password = None
def get_token(self):
# If the config file exists, assume an initial signin has taken place and try basic sign in
if os.path.isfile(self.config_file_path):
class OnePassCLIBase(with_metaclass(abc.ABCMeta, object)):
bin = "op"
if not self.master_password:
raise AnsibleLookupError('Unable to sign in to 1Password. master_password is required.')
def __init__(
self,
subdomain=None,
domain="1password.com",
username=None,
secret_key=None,
master_password=None,
service_account_token=None,
account_id=None,
connect_host=None,
connect_token=None,
):
self.subdomain = subdomain
self.domain = domain
self.username = username
self.master_password = master_password
self.secret_key = secret_key
self.service_account_token = service_account_token
self.account_id = account_id
self.connect_host = connect_host
self.connect_token = connect_token
try:
args = ['signin', '--output=raw']
self._path = None
self._version = None
if self.subdomain:
args = ['signin', self.subdomain, '--output=raw']
def _check_required_params(self, required_params):
non_empty_attrs = dict((param, getattr(self, param, None)) for param in required_params if getattr(self, param, None))
missing = set(required_params).difference(non_empty_attrs)
if missing:
prefix = "Unable to sign in to 1Password. Missing required parameter"
plural = ""
suffix = ": {params}.".format(params=", ".join(missing))
if len(missing) > 1:
plural = "s"
rc, out, err = self._run(args, command_input=to_bytes(self.master_password))
self.token = out.strip()
msg = "{prefix}{plural}{suffix}".format(prefix=prefix, plural=plural, suffix=suffix)
raise AnsibleLookupError(msg)
except AnsibleLookupError:
self.full_login()
@abc.abstractmethod
def _parse_field(self, data_json, field_name, section_title):
"""Main method for parsing data returned from the op command line tool"""
else:
# Attempt a full sign in since there appears to be no existing sign in
self.full_login()
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False, environment_update=None):
command = [self.path] + args
call_kwargs = {
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
"stdin": subprocess.PIPE,
}
def assert_logged_in(self):
try:
rc, out, err = self._run(['get', 'account'], ignore_errors=True)
if rc == 0:
self.logged_in = True
if not self.logged_in:
self.get_token()
except OSError as e:
if e.errno == errno.ENOENT:
raise AnsibleLookupError("1Password CLI tool '%s' not installed in path on control machine" % self.cli_path)
raise e
if environment_update:
env = os.environ.copy()
env.update(environment_update)
call_kwargs["env"] = env
def get_raw(self, item_id, vault=None):
args = ["get", "item", item_id]
if vault is not None:
args += ['--vault={0}'.format(vault)]
if not self.logged_in:
args += [to_bytes('--session=') + self.token]
rc, output, dummy = self._run(args)
return output
def get_field(self, item_id, field, section=None, vault=None):
output = self.get_raw(item_id, vault)
return self._parse_field(output, field, section) if output != '' else ''
def full_login(self):
if None in [self.subdomain, self.username, self.secret_key, self.master_password]:
raise AnsibleLookupError('Unable to perform initial sign in to 1Password. '
'subdomain, username, secret_key, and master_password are required to perform initial sign in.')
args = [
'signin',
'{0}.{1}'.format(self.subdomain, self.domain),
to_bytes(self.username),
to_bytes(self.secret_key),
'--output=raw',
]
rc, out, err = self._run(args, command_input=to_bytes(self.master_password))
self.token = out.strip()
def _run(self, args, expected_rc=0, command_input=None, ignore_errors=False):
command = [self.cli_path] + args
p = Popen(command, stdout=PIPE, stderr=PIPE, stdin=PIPE)
p = subprocess.Popen(command, **call_kwargs)
out, err = p.communicate(input=command_input)
rc = p.wait()
if not ignore_errors and rc != expected_rc:
raise AnsibleLookupError(to_text(err))
return rc, out, err
def _parse_field(self, data_json, field_name, section_title=None):
@abc.abstractmethod
def assert_logged_in(self):
"""Check whether a login session exists"""
@abc.abstractmethod
def full_signin(self):
"""Performa full login"""
@abc.abstractmethod
def get_raw(self, item_id, vault=None, token=None):
"""Gets the specified item from the vault"""
@abc.abstractmethod
def signin(self):
"""Sign in using the master password"""
@property
def path(self):
if self._path is None:
self._path = get_bin_path(self.bin)
return self._path
@property
def version(self):
if self._version is None:
self._version = self.get_current_version()
return self._version
@classmethod
def get_current_version(cls):
"""Standalone method to get the op CLI version. Useful when determining which class to load
based on the current version."""
try:
bin_path = get_bin_path(cls.bin)
except ValueError:
raise AnsibleLookupError("Unable to locate '%s' command line tool" % cls.bin)
try:
b_out = subprocess.check_output([bin_path, "--version"], stderr=subprocess.PIPE)
except subprocess.CalledProcessError as cpe:
raise AnsibleLookupError("Unable to get the op version: %s" % cpe)
return to_text(b_out).strip()
class OnePassCLIv1(OnePassCLIBase):
supports_version = "1"
def _parse_field(self, data_json, field_name, section_title):
"""
Retrieves the desired field from the `op` response payload
@@ -246,39 +275,435 @@ class OnePass(object):
# check the details dictionary for `field_name` and return it immediately if it exists
# when the entry is a "password" instead of a "login" item, the password field is a key
# in the `details` dictionary:
if field_name in data['details']:
return data['details'][field_name]
if field_name in data["details"]:
return data["details"][field_name]
# when the field is not found above, iterate through the fields list in the object details
for field_data in data['details'].get('fields', []):
if field_data.get('name', '').lower() == field_name.lower():
return field_data.get('value', '')
for section_data in data['details'].get('sections', []):
if section_title is not None and section_title.lower() != section_data['title'].lower():
for field_data in data["details"].get("fields", []):
if field_data.get("name", "").lower() == field_name.lower():
return field_data.get("value", "")
for section_data in data["details"].get("sections", []):
if section_title is not None and section_title.lower() != section_data["title"].lower():
continue
for field_data in section_data.get('fields', []):
if field_data.get('t', '').lower() == field_name.lower():
return field_data.get('v', '')
return ''
for field_data in section_data.get("fields", []):
if field_data.get("t", "").lower() == field_name.lower():
return field_data.get("v", "")
return ""
def assert_logged_in(self):
args = ["get", "account"]
if self.account_id:
args.extend(["--account", self.account_id])
elif self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True)
return not bool(rc)
def full_signin(self):
if self.connect_host or self.connect_token:
raise AnsibleLookupError(
"1Password Connect is not available with 1Password CLI version 1. Please use version 2 or later.")
if self.service_account_token:
raise AnsibleLookupError(
"1Password CLI version 1 does not support Service Accounts. Please use version 2 or later.")
required_params = [
"subdomain",
"username",
"secret_key",
"master_password",
]
self._check_required_params(required_params)
args = [
"signin",
"{0}.{1}".format(self.subdomain, self.domain),
to_bytes(self.username),
to_bytes(self.secret_key),
"--raw",
]
return self._run(args, command_input=to_bytes(self.master_password))
def get_raw(self, item_id, vault=None, token=None):
args = ["get", "item", item_id]
if self.account_id:
args.extend(["--account", self.account_id])
if vault is not None:
args += ["--vault={0}".format(vault)]
if token is not None:
args += [to_bytes("--session=") + token]
return self._run(args)
def signin(self):
self._check_required_params(['master_password'])
args = ["signin", "--raw"]
if self.subdomain:
args.append(self.subdomain)
return self._run(args, command_input=to_bytes(self.master_password))
class OnePassCLIv2(OnePassCLIBase):
"""
CLIv2 Syntax Reference: https://developer.1password.com/docs/cli/upgrade#step-2-update-your-scripts
"""
supports_version = "2"
def _parse_field(self, data_json, field_name, section_title=None):
"""
Schema reference: https://developer.1password.com/docs/cli/item-template-json
Example Data:
# Password item
{
"id": "ywvdbojsguzgrgnokmcxtydgdv",
"title": "Authy Backup",
"version": 1,
"vault": {
"id": "bcqxysvcnejjrwzoqrwzcqjqxc",
"name": "Personal"
},
"category": "PASSWORD",
"last_edited_by": "7FUPZ8ZNE02KSHMAIMKHIVUE17",
"created_at": "2015-01-18T13:13:38Z",
"updated_at": "2016-02-20T16:23:54Z",
"additional_information": "Jan 18, 2015, 08:13:38",
"fields": [
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "OctoberPoppyNuttyDraperySabbath",
"reference": "op://Personal/Authy Backup/password",
"password_details": {
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"value": "Backup password to restore Authy",
"reference": "op://Personal/Authy Backup/notesPlain"
}
]
}
# Login item
{
"id": "awk4s2u44fhnrgppszcsvc663i",
"title": "Dummy Login",
"version": 2,
"vault": {
"id": "stpebbaccrq72xulgouxsk4p7y",
"name": "Personal"
},
"category": "LOGIN",
"last_edited_by": "LSGPJERUYBH7BFPHMZ2KKGL6AU",
"created_at": "2018-04-25T21:55:19Z",
"updated_at": "2018-04-25T21:56:06Z",
"additional_information": "agent.smith",
"urls": [
{
"primary": true,
"href": "https://acme.com"
}
],
"sections": [
{
"id": "linked items",
"label": "Related Items"
}
],
"fields": [
{
"id": "username",
"type": "STRING",
"purpose": "USERNAME",
"label": "username",
"value": "agent.smith",
"reference": "op://Personal/Dummy Login/username"
},
{
"id": "password",
"type": "CONCEALED",
"purpose": "PASSWORD",
"label": "password",
"value": "Q7vFwTJcqwxKmTU]Dzx7NW*wrNPXmj",
"entropy": 159.6083697084228,
"reference": "op://Personal/Dummy Login/password",
"password_details": {
"entropy": 159,
"generated": true,
"strength": "FANTASTIC"
}
},
{
"id": "notesPlain",
"type": "STRING",
"purpose": "NOTES",
"label": "notesPlain",
"reference": "op://Personal/Dummy Login/notesPlain"
}
]
}
"""
data = json.loads(data_json)
field_name = _lower_if_possible(field_name)
for field in data.get("fields", []):
if section_title is None:
# If the field name exists in the section, return that value
if field.get(field_name):
return field.get(field_name)
# If the field name doesn't exist in the section, match on the value of "label"
# then "id" and return "value"
if field.get("label", "").lower() == field_name:
return field.get("value", "")
if field.get("id", "").lower() == field_name:
return field.get("value", "")
# Look at the section data and get an identifier. The value of 'id' is either a unique ID
# or a human-readable string. If a 'label' field exists, prefer that since
# it is the value visible in the 1Password UI when both 'id' and 'label' exist.
section = field.get("section", {})
section_title = _lower_if_possible(section_title)
current_section_title = section.get("label", section.get("id", "")).lower()
if section_title == current_section_title:
# In the correct section. Check "label" then "id" for the desired field_name
if field.get("label") == field_name:
return field.get("value", "")
if field.get("id") == field_name:
return field.get("value", "")
return ""
def assert_logged_in(self):
if self.connect_host and self.connect_token:
return True
if self.service_account_token:
args = ["whoami"]
environment_update = {"OP_SERVICE_ACCOUNT_TOKEN": self.service_account_token}
rc, out, err = self._run(args, environment_update=environment_update)
return not bool(rc)
args = ["account", "list"]
if self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
args.extend(["--account", account])
rc, out, err = self._run(args)
if out:
# Running 'op account get' if there are no accounts configured on the system drops into
# an interactive prompt. Only run 'op account get' after first listing accounts to see
# if there are any previously configured accounts.
args = ["account", "get"]
if self.account_id:
args.extend(["--account", self.account_id])
elif self.subdomain:
account = "{subdomain}.{domain}".format(subdomain=self.subdomain, domain=self.domain)
args.extend(["--account", account])
rc, out, err = self._run(args, ignore_errors=True)
return not bool(rc)
return False
def full_signin(self):
required_params = [
"subdomain",
"username",
"secret_key",
"master_password",
]
self._check_required_params(required_params)
args = [
"account", "add", "--raw",
"--address", "{0}.{1}".format(self.subdomain, self.domain),
"--email", to_bytes(self.username),
"--signin",
]
environment_update = {"OP_SECRET_KEY": self.secret_key}
return self._run(args, command_input=to_bytes(self.master_password), environment_update=environment_update)
def get_raw(self, item_id, vault=None, token=None):
args = ["item", "get", item_id, "--format", "json"]
if self.account_id:
args.extend(["--account", self.account_id])
if vault is not None:
args += ["--vault={0}".format(vault)]
if self.connect_host and self.connect_token:
if vault is None:
raise AnsibleLookupError("'vault' is required with 1Password Connect")
environment_update = {
"OP_CONNECT_HOST": self.connect_host,
"OP_CONNECT_TOKEN": self.connect_token,
}
return self._run(args, environment_update=environment_update)
if self.service_account_token:
if vault is None:
raise AnsibleLookupError("'vault' is required with 'service_account_token'")
environment_update = {"OP_SERVICE_ACCOUNT_TOKEN": self.service_account_token}
return self._run(args, environment_update=environment_update)
if token is not None:
args += [to_bytes("--session=") + token]
return self._run(args)
def signin(self):
self._check_required_params(['master_password'])
args = ["signin", "--raw"]
if self.subdomain:
args.extend(["--account", self.subdomain])
return self._run(args, command_input=to_bytes(self.master_password))
class OnePass(object):
def __init__(self, subdomain=None, domain="1password.com", username=None, secret_key=None, master_password=None,
service_account_token=None, account_id=None, connect_host=None, connect_token=None, cli_class=None):
self.subdomain = subdomain
self.domain = domain
self.username = username
self.secret_key = secret_key
self.master_password = master_password
self.service_account_token = service_account_token
self.account_id = account_id
self.connect_host = connect_host
self.connect_token = connect_token
self.logged_in = False
self.token = None
self._config = OnePasswordConfig()
self._cli = self._get_cli_class(cli_class)
if (self.connect_host or self.connect_token) and None in (self.connect_host, self.connect_token):
raise AnsibleOptionsError("connect_host and connect_token are required together")
def _get_cli_class(self, cli_class=None):
if cli_class is not None:
return cli_class(self.subdomain, self.domain, self.username, self.secret_key, self.master_password, self.service_account_token)
version = OnePassCLIBase.get_current_version()
for cls in OnePassCLIBase.__subclasses__():
if cls.supports_version == version.split(".")[0]:
try:
return cls(self.subdomain, self.domain, self.username, self.secret_key, self.master_password, self.service_account_token,
self.account_id, self.connect_host, self.connect_token)
except TypeError as e:
raise AnsibleLookupError(e)
raise AnsibleLookupError("op version %s is unsupported" % version)
def set_token(self):
if self._config.config_file_path and os.path.isfile(self._config.config_file_path):
# If the config file exists, assume an initial sign in has taken place and try basic sign in
try:
rc, out, err = self._cli.signin()
except AnsibleLookupError as exc:
test_strings = (
"missing required parameters",
"unauthorized",
)
if any(string in exc.message.lower() for string in test_strings):
# A required parameter is missing, or a bad master password was supplied
# so don't bother attempting a full signin
raise
rc, out, err = self._cli.full_signin()
self.token = out.strip()
else:
# Attempt a full signin since there appears to be no existing signin
rc, out, err = self._cli.full_signin()
self.token = out.strip()
def assert_logged_in(self):
logged_in = self._cli.assert_logged_in()
if logged_in:
self.logged_in = logged_in
pass
else:
self.set_token()
def get_raw(self, item_id, vault=None):
rc, out, err = self._cli.get_raw(item_id, vault, self.token)
return out
def get_field(self, item_id, field, section=None, vault=None):
output = self.get_raw(item_id, vault)
if output:
return self._cli._parse_field(output, field, section)
return ""
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
op = OnePass()
self.set_options(var_options=variables, direct=kwargs)
field = kwargs.get('field', 'password')
section = kwargs.get('section')
vault = kwargs.get('vault')
op.subdomain = kwargs.get('subdomain')
op.domain = kwargs.get('domain', '1password.com')
op.username = kwargs.get('username')
op.secret_key = kwargs.get('secret_key')
op.master_password = kwargs.get('master_password', kwargs.get('vault_password'))
field = self.get_option("field")
section = self.get_option("section")
vault = self.get_option("vault")
subdomain = self.get_option("subdomain")
domain = self.get_option("domain")
username = self.get_option("username")
secret_key = self.get_option("secret_key")
master_password = self.get_option("master_password")
service_account_token = self.get_option("service_account_token")
account_id = self.get_option("account_id")
connect_host = self.get_option("connect_host")
connect_token = self.get_option("connect_token")
op = OnePass(
subdomain=subdomain,
domain=domain,
username=username,
secret_key=secret_key,
master_password=master_password,
service_account_token=service_account_token,
account_id=account_id,
connect_host=connect_host,
connect_token=connect_token,
)
op.assert_logged_in()
values = []
for term in terms:
values.append(op.get_field(term, field, section, vault))
return values