308 lines
12 KiB
Python
308 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Copyright (c) 2017-2020 Felix Fontein
|
|
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
|
|
|
|
from __future__ import (absolute_import, division, print_function)
|
|
__metaclass__ = type
|
|
|
|
|
|
from ansible.module_utils.common.text.converters import to_native
|
|
from ansible.module_utils.six import string_types
|
|
|
|
try:
|
|
import lxml.etree
|
|
HAS_LXML_ETREE = True
|
|
except ImportError:
|
|
HAS_LXML_ETREE = False
|
|
|
|
from ansible_collections.community.dns.plugins.module_utils.http import (
|
|
NetworkError,
|
|
)
|
|
|
|
|
|
class WSDLException(Exception):
|
|
pass
|
|
|
|
|
|
class WSDLNetworkError(WSDLException):
|
|
pass
|
|
|
|
|
|
class WSDLError(WSDLException):
|
|
def __init__(self, origin, error_code, message):
|
|
super(WSDLError, self).__init__('{0} ({1}): {2}'.format(origin, error_code, message))
|
|
self.error_origin = origin
|
|
self.error_code = error_code
|
|
self.error_message = message
|
|
|
|
|
|
class WSDLCodingException(WSDLException):
|
|
pass
|
|
|
|
|
|
def _split_text_namespace(node, text):
|
|
i = text.find(':')
|
|
if i < 0:
|
|
return text, None
|
|
ns = node.nsmap.get(text[:i])
|
|
text = text[i + 1:]
|
|
return text, ns
|
|
|
|
|
|
_NAMESPACE_ENVELOPE = 'http://schemas.xmlsoap.org/soap/envelope/'
|
|
_NAMESPACE_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
|
|
_NAMESPACE_XSD = 'http://www.w3.org/2001/XMLSchema'
|
|
_NAMESPACE_XML_SOAP = 'http://xml.apache.org/xml-soap'
|
|
_NAMESPACE_XML_SOAP_ENCODING = 'http://schemas.xmlsoap.org/soap/encoding/'
|
|
|
|
|
|
def _set_type(node, type_value, namespace=None):
|
|
if namespace is not None:
|
|
type_value = lxml.etree.QName(namespace, type_value)
|
|
node.set(lxml.etree.QName(_NAMESPACE_XSI, 'type').text, type_value)
|
|
|
|
|
|
def encode_wsdl(node, value):
|
|
if value is None:
|
|
node.set(lxml.etree.QName(_NAMESPACE_XSI, 'nil').text, 'true')
|
|
elif isinstance(value, string_types):
|
|
_set_type(node, 'xsd:string')
|
|
node.text = value
|
|
elif isinstance(value, int):
|
|
_set_type(node, 'xsd:int')
|
|
node.text = str(value)
|
|
elif isinstance(value, bool):
|
|
_set_type(node, 'xsd:boolean')
|
|
node.text = ('true' if value else 'false')
|
|
elif isinstance(value, dict):
|
|
_set_type(node, 'Map', _NAMESPACE_XML_SOAP)
|
|
for key, val in sorted(value.items()):
|
|
child = lxml.etree.Element('item')
|
|
ke = lxml.etree.Element('key')
|
|
encode_wsdl(ke, key)
|
|
child.append(ke)
|
|
ve = lxml.etree.Element('value')
|
|
encode_wsdl(ve, val)
|
|
child.append(ve)
|
|
node.append(child)
|
|
elif isinstance(value, list):
|
|
_set_type(node, 'SOAP-ENC:Array')
|
|
for elt in value:
|
|
child = lxml.etree.Element('item')
|
|
encode_wsdl(child, elt)
|
|
node.append(child)
|
|
else:
|
|
raise WSDLCodingException('Do not know how to encode {0}!'.format(type(value)))
|
|
|
|
|
|
def _decode_wsdl_array(result, node, root_ns, ids):
|
|
for item in node:
|
|
if item.tag != 'item':
|
|
raise WSDLCodingException('Invalid child tag "{0}" in map!'.format(item.tag))
|
|
result.append(decode_wsdl(item, root_ns, ids))
|
|
|
|
|
|
def decode_wsdl(node, root_ns, ids):
|
|
href = node.get('href')
|
|
nil = node.get(lxml.etree.QName(_NAMESPACE_XSI, 'nil'))
|
|
id = node.get('id')
|
|
if href is not None:
|
|
if not href.startswith('#'):
|
|
raise WSDLCodingException('Global reference "{0}" not supported!'.format(href))
|
|
href = href[1:]
|
|
if href not in ids:
|
|
raise WSDLCodingException('ID "{0}" not yet defined!'.format(href))
|
|
result = ids[href]
|
|
elif nil == 'true':
|
|
result = None
|
|
else:
|
|
type_with_ns = node.get(lxml.etree.QName(_NAMESPACE_XSI, 'type'))
|
|
if type_with_ns is None:
|
|
raise WSDLCodingException('Element "{0}" has no "xsi:type" tag!'.format(node))
|
|
type, ns = _split_text_namespace(node, type_with_ns)
|
|
if ns is None:
|
|
raise WSDLCodingException('Cannot find namespace for "{0}"!'.format(type_with_ns))
|
|
if ns == _NAMESPACE_XSD:
|
|
if type == 'boolean':
|
|
if node.text == 'true':
|
|
result = True
|
|
elif node.text == 'false':
|
|
result = False
|
|
else:
|
|
raise WSDLCodingException('Invalid value for boolean: "{0}"'.format(node.text))
|
|
elif type == 'int':
|
|
result = int(node.text)
|
|
elif type == 'string':
|
|
result = node.text
|
|
else:
|
|
raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type))
|
|
elif ns == _NAMESPACE_XML_SOAP:
|
|
if type == 'Map':
|
|
result = dict()
|
|
if id is not None:
|
|
ids[id] = result
|
|
for item in node:
|
|
if item.tag != 'item':
|
|
raise WSDLCodingException('Invalid child tag "{0}" in map!'.format(item.tag))
|
|
key = item.find('key')
|
|
if key is None:
|
|
raise WSDLCodingException('Cannot find key for "{0}"!'.format(item))
|
|
key = decode_wsdl(key, root_ns, ids)
|
|
value = item.find('value')
|
|
if value is None:
|
|
raise WSDLCodingException('Cannot find value for "{0}"!'.format(item))
|
|
value = decode_wsdl(value, root_ns, ids)
|
|
result[key] = value
|
|
return result
|
|
else:
|
|
raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type))
|
|
elif ns == _NAMESPACE_XML_SOAP_ENCODING:
|
|
if type == 'Array':
|
|
result = []
|
|
if id is not None:
|
|
ids[id] = result
|
|
_decode_wsdl_array(result, node, root_ns, ids)
|
|
else:
|
|
raise WSDLCodingException('Unknown XSD type "{0}"!'.format(type))
|
|
elif ns == root_ns:
|
|
array_type = node.get(lxml.etree.QName(_NAMESPACE_XML_SOAP_ENCODING, 'arrayType'))
|
|
if array_type is not None:
|
|
result = []
|
|
if id is not None:
|
|
ids[id] = result
|
|
_decode_wsdl_array(result, node, root_ns, ids)
|
|
else:
|
|
result = dict()
|
|
if id is not None:
|
|
ids[id] = result
|
|
for item in node:
|
|
result[item.tag] = decode_wsdl(item, root_ns, ids)
|
|
else:
|
|
raise WSDLCodingException('Unknown type namespace "{0}" (with type "{1}")!'.format(ns, type))
|
|
if id is not None:
|
|
ids[id] = result
|
|
return result
|
|
|
|
|
|
class Parser(object):
|
|
def _parse(self, result, node, where):
|
|
for child in node:
|
|
tag = lxml.etree.QName(child.tag)
|
|
if tag.namespace != self._api:
|
|
raise WSDLCodingException('Cannot interpret {0} item of type "{1}"!'.format(where, tag))
|
|
for res in child.iter('return'):
|
|
result[tag.localname] = decode_wsdl(res, self._api, {})
|
|
|
|
def __init__(self, api, root):
|
|
self._main_ns = _NAMESPACE_ENVELOPE
|
|
self._api = api
|
|
self._root = root
|
|
for fault in self._root.iter(lxml.etree.QName(self._main_ns, 'Fault').text):
|
|
fault_code = fault.find('faultcode')
|
|
fault_code_val = None
|
|
fault_string = fault.find('faultstring')
|
|
origin = 'server'
|
|
if fault_code is not None and fault_code.text:
|
|
code, code_ns = _split_text_namespace(fault, fault_code.text)
|
|
fault_code_val = code
|
|
if code_ns == self._main_ns:
|
|
origin = code.lower()
|
|
if fault_string is not None and fault_string.text:
|
|
raise WSDLError(origin, fault_code_val, fault_string.text)
|
|
raise WSDLError(origin, fault_code_val, lxml.etree.tostring(fault).decode('utf-8'))
|
|
self._header = dict()
|
|
self._body = dict()
|
|
for header in self._root.iter(lxml.etree.QName(self._main_ns, 'Header').text):
|
|
self._parse(self._header, header, 'header')
|
|
for body in self._root.iter(lxml.etree.QName(self._main_ns, 'Body').text):
|
|
self._parse(self._body, body, 'body')
|
|
|
|
def get_header(self, header):
|
|
return self._header[header]
|
|
|
|
def get_result(self, body):
|
|
return self._body[body]
|
|
|
|
def __str__(self):
|
|
return 'header={0}, body={1}'.format(self._header, self._body)
|
|
|
|
def __repr__(self):
|
|
return '''<?xml version='1.0' encoding='utf-8'?>''' + '\n' + lxml.etree.tostring(self._root, pretty_print=True).decode('utf-8')
|
|
|
|
|
|
class Composer(object):
|
|
@staticmethod
|
|
def _create(tag, namespace=None, **kwarg):
|
|
if namespace:
|
|
return lxml.etree.Element(lxml.etree.QName(namespace, tag), **kwarg)
|
|
else:
|
|
return lxml.etree.Element(tag, **kwarg)
|
|
|
|
def __str__(self):
|
|
return '''<?xml version='1.0' encoding='utf-8'?>''' + '\n' + lxml.etree.tostring(self._root, pretty_print=True).decode('utf-8')
|
|
|
|
def _create_envelope(self, tag, **kwarg):
|
|
return self._create(tag, self._main_ns, **kwarg)
|
|
|
|
def __init__(self, http_helper, api, namespaces=None):
|
|
self._http_helper = http_helper
|
|
self._main_ns = _NAMESPACE_ENVELOPE
|
|
self._api = api
|
|
# Compose basic document
|
|
all_namespaces = {
|
|
'SOAP-ENV': _NAMESPACE_ENVELOPE,
|
|
'xsd': _NAMESPACE_XSD,
|
|
'xsi': _NAMESPACE_XSI,
|
|
'ns2': 'auth',
|
|
'SOAP-ENC': _NAMESPACE_XML_SOAP_ENCODING,
|
|
}
|
|
if namespaces is not None:
|
|
all_namespaces.update(namespaces)
|
|
self._root = self._create_envelope('Envelope', nsmap=all_namespaces)
|
|
self._root.set(lxml.etree.QName(self._main_ns, 'encodingStyle').text, _NAMESPACE_XML_SOAP_ENCODING)
|
|
self._header = self._create_envelope('Header')
|
|
self._root.append(self._header)
|
|
self._body = self._create_envelope('Body')
|
|
self._root.append(self._body)
|
|
self._command = None
|
|
|
|
def add_auth(self, username, password):
|
|
auth = self._create('authenticate', 'auth')
|
|
user = self._create('UserName')
|
|
user.text = username
|
|
auth.append(user)
|
|
pw = self._create('Password')
|
|
pw.text = password
|
|
auth.append(pw)
|
|
self._header.append(auth)
|
|
|
|
def add_simple_command(self, command, **args):
|
|
self._command = command
|
|
command = self._create(command, self._api)
|
|
for arg, value in args.items():
|
|
arg = self._create(arg)
|
|
encode_wsdl(arg, value)
|
|
command.append(arg)
|
|
self._body.append(command)
|
|
|
|
def execute(self, debug=False):
|
|
payload = b'''<?xml version='1.0' encoding='utf-8'?>''' + b'\n' + lxml.etree.tostring(self._root) + b'\n'
|
|
try:
|
|
headers = {
|
|
'Content-Type': 'text/xml; charset=utf-8',
|
|
'Content-Length': str(len(payload)),
|
|
}
|
|
if self._command:
|
|
headers['SOAPAction'] = '"{0}#{1}"'.format(self._api, self._command)
|
|
result, info = self._http_helper.fetch_url(self._api, data=payload, method='POST', timeout=300, headers=headers)
|
|
code = info['status']
|
|
except NetworkError as e:
|
|
raise WSDLNetworkError(to_native(e))
|
|
# if debug:
|
|
# q.q('Result: {0}, content: {1}'.format(code, result.decode('utf-8')))
|
|
if code < 200 or code >= 300:
|
|
Parser(self._api, lxml.etree.fromstring(result))
|
|
raise WSDLError('server', 'Error {0} while executing WSDL command:\n{1}'.format(code, result.decode('utf-8')))
|
|
return Parser(self._api, lxml.etree.fromstring(result))
|