123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- # -*- coding: utf-8 -*-
- # Description:
- # Author: Pawel Krupa (paulfantom)
- # Author: Ilya Mashchenko (ilyam8)
- # SPDX-License-Identifier: GPL-3.0-or-later
- import urllib3
- from bases.FrameworkServices.SimpleService import SimpleService
- try:
- urllib3.disable_warnings()
- except AttributeError:
- pass
- URLLIB3_VERSION = urllib3.__version__
- URLLIB3 = 'urllib3'
- class UrlService(SimpleService):
- def __init__(self, configuration=None, name=None):
- SimpleService.__init__(self, configuration=configuration, name=name)
- self.debug("{0} version: {1}".format(URLLIB3, URLLIB3_VERSION))
- self.url = self.configuration.get('url')
- self.user = self.configuration.get('user')
- self.password = self.configuration.get('pass')
- self.proxy_user = self.configuration.get('proxy_user')
- self.proxy_password = self.configuration.get('proxy_pass')
- self.proxy_url = self.configuration.get('proxy_url')
- self.method = self.configuration.get('method', 'GET')
- self.header = self.configuration.get('header')
- self.body = self.configuration.get('body')
- self.request_timeout = self.configuration.get('timeout', 1)
- self.respect_retry_after_header = self.configuration.get('respect_retry_after_header')
- self.tls_verify = self.configuration.get('tls_verify')
- self.tls_ca_file = self.configuration.get('tls_ca_file')
- self.tls_key_file = self.configuration.get('tls_key_file')
- self.tls_cert_file = self.configuration.get('tls_cert_file')
- self._manager = None
- def __make_headers(self, **header_kw):
- user = header_kw.get('user') or self.user
- password = header_kw.get('pass') or self.password
- proxy_user = header_kw.get('proxy_user') or self.proxy_user
- proxy_password = header_kw.get('proxy_pass') or self.proxy_password
- custom_header = header_kw.get('header') or self.header
- header_params = dict(keep_alive=True)
- proxy_header_params = dict()
- if user and password:
- header_params['basic_auth'] = '{user}:{password}'.format(user=user,
- password=password)
- if proxy_user and proxy_password:
- proxy_header_params['proxy_basic_auth'] = '{user}:{password}'.format(user=proxy_user,
- password=proxy_password)
- try:
- header, proxy_header = urllib3.make_headers(**header_params), urllib3.make_headers(**proxy_header_params)
- except TypeError as error:
- self.error('build_header() error: {error}'.format(error=error))
- return None, None
- else:
- header.update(custom_header or dict())
- return header, proxy_header
- def _build_manager(self, **header_kw):
- header, proxy_header = self.__make_headers(**header_kw)
- if header is None or proxy_header is None:
- return None
- proxy_url = header_kw.get('proxy_url') or self.proxy_url
- if proxy_url:
- manager = urllib3.ProxyManager
- params = dict(proxy_url=proxy_url, headers=header, proxy_headers=proxy_header)
- else:
- manager = urllib3.PoolManager
- params = dict(headers=header)
- tls_cert_file = self.tls_cert_file
- if tls_cert_file:
- params['cert_file'] = tls_cert_file
- # NOTE: key_file is useless without cert_file, but
- # cert_file may include the key as well.
- tls_key_file = self.tls_key_file
- if tls_key_file:
- params['key_file'] = tls_key_file
- tls_ca_file = self.tls_ca_file
- if tls_ca_file:
- params['ca_certs'] = tls_ca_file
- try:
- url = header_kw.get('url') or self.url
- is_https = url.startswith('https')
- if skip_tls_verify(is_https, self.tls_verify, tls_ca_file):
- params['ca_certs'] = None
- params['cert_reqs'] = 'CERT_NONE'
- if is_https:
- params['assert_hostname'] = False
- return manager(**params)
- except (urllib3.exceptions.ProxySchemeUnknown, TypeError) as error:
- self.error('build_manager() error:', str(error))
- return None
- def _get_raw_data(self, url=None, manager=None, **kwargs):
- """
- Get raw data from http request
- :return: str
- """
- try:
- response = self._do_request(url, manager, **kwargs)
- except Exception as error:
- self.error('Url: {url}. Error: {error}'.format(url=url or self.url, error=error))
- return None
- if response.status == 200:
- if isinstance(response.data, str):
- return response.data
- return response.data.decode(errors='ignore')
- else:
- self.debug('Url: {url}. Http response status code: {code}'.format(url=url or self.url, code=response.status))
- return None
- def _get_raw_data_with_status(self, url=None, manager=None, retries=1, redirect=True, **kwargs):
- """
- Get status and response body content from http request. Does not catch exceptions
- :return: int, str
- """
- response = self._do_request(url, manager, retries, redirect, **kwargs)
- if isinstance(response.data, str):
- return response.status, response.data
- return response.status, response.data.decode(errors='ignore')
- def _do_request(self, url=None, manager=None, retries=1, redirect=True, **kwargs):
- """
- Get response from http request. Does not catch exceptions
- :return: HTTPResponse
- """
- url = url or self.url
- manager = manager or self._manager
- retry = urllib3.Retry(retries)
- if hasattr(retry, 'respect_retry_after_header'):
- retry.respect_retry_after_header = bool(self.respect_retry_after_header)
- if self.body:
- kwargs['body'] = self.body
- response = manager.request(
- method=self.method,
- url=url,
- timeout=self.request_timeout,
- retries=retry,
- headers=manager.headers,
- redirect=redirect,
- **kwargs
- )
- return response
- def check(self):
- """
- Format configuration data and try to connect to server
- :return: boolean
- """
- if not (self.url and isinstance(self.url, str)):
- self.error('URL is not defined or type is not <str>')
- return False
- self._manager = self._build_manager()
- if not self._manager:
- return False
- try:
- data = self._get_data()
- except Exception as error:
- self.error('_get_data() failed. Url: {url}. Error: {error}'.format(url=self.url, error=error))
- return False
- if isinstance(data, dict) and data:
- return True
- self.error('_get_data() returned no data or type is not <dict>')
- return False
- def skip_tls_verify(is_https, tls_verify, tls_ca_file):
- # default 'tls_verify' value is None
- # logic is:
- # - never skip if there is 'tls_ca_file' file
- # - skip by default for https
- # - do not skip by default for http
- if tls_ca_file:
- return False
- if is_https and not tls_verify:
- return True
- return tls_verify is False
|