httputil.py 8.0 KB


  1. import atexit
  2. import http
  3. import logging
  4. import multiprocessing
  5. import os
  6. import sys
  7. import socket
  8. import time
  9. from typing import Dict, Any, Optional
  10. import certifi
  11. import lz4.frame
  12. import urllib3
  13. import zstandard
  14. from urllib3.poolmanager import PoolManager, ProxyManager
  15. from urllib3.response import HTTPResponse
  16. from clickhouse_connect.driver.exceptions import ProgrammingError
  17. from clickhouse_connect import common
  18. logger = logging.getLogger(__name__)
  19. # We disable this warning. Verify must explicitly set to false, so we assume the user knows what they're doing
  20. urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
  21. # Increase this number just to be safe when ClickHouse is returning progress headers
  22. http.client._MAXHEADERS = 10000 # pylint: disable=protected-access
  23. DEFAULT_KEEP_INTERVAL = 30
  24. DEFAULT_KEEP_COUNT = 3
  25. DEFAULT_KEEP_IDLE = 30
  26. SOCKET_TCP = socket.IPPROTO_TCP
  27. core_socket_options = [
  28. (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
  29. (SOCKET_TCP, socket.TCP_NODELAY, 1),
  30. (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256),
  31. (socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 256)
  32. ]
  33. logging.getLogger('urllib3').setLevel(logging.WARNING)
  34. _proxy_managers = {}
  35. all_managers = {}
  36. @atexit.register
  37. def close_managers():
  38. for manager in all_managers:
  39. manager.clear()
  40. # pylint: disable=no-member,too-many-arguments,too-many-branches
  41. def get_pool_manager_options(keep_interval: int = DEFAULT_KEEP_INTERVAL,
  42. keep_count: int = DEFAULT_KEEP_COUNT,
  43. keep_idle: int = DEFAULT_KEEP_IDLE,
  44. ca_cert: str = None,
  45. verify: bool = True,
  46. client_cert: str = None,
  47. client_cert_key: str = None,
  48. **options) -> Dict[str, Any]:
  49. socket_options = core_socket_options.copy()
  50. if getattr(socket, 'TCP_KEEPINTVL', None) is not None:
  51. socket_options.append((SOCKET_TCP, socket.TCP_KEEPINTVL, keep_interval))
  52. if getattr(socket, 'TCP_KEEPCNT', None) is not None:
  53. socket_options.append((SOCKET_TCP, socket.TCP_KEEPCNT, keep_count))
  54. if getattr(socket, 'TCP_KEEPIDLE', None) is not None:
  55. socket_options.append((SOCKET_TCP, socket.TCP_KEEPIDLE, keep_idle))
  56. if sys.platform == 'darwin':
  57. socket_options.append((SOCKET_TCP, getattr(socket, 'TCP_KEEPALIVE', 0x10), keep_interval))
  58. options['maxsize'] = options.get('maxsize', 8)
  59. options['retries'] = options.get('retries', 1)
  60. if ca_cert == 'certifi':
  61. ca_cert = certifi.where()
  62. options['cert_reqs'] = 'CERT_REQUIRED' if verify else 'CERT_NONE'
  63. if ca_cert:
  64. options['ca_certs'] = ca_cert
  65. if client_cert:
  66. options['cert_file'] = client_cert
  67. if client_cert_key:
  68. options['key_file'] = client_cert_key
  69. options['socket_options'] = socket_options
  70. options['block'] = options.get('block', False)
  71. return options
  72. def get_pool_manager(keep_interval: int = DEFAULT_KEEP_INTERVAL,
  73. keep_count: int = DEFAULT_KEEP_COUNT,
  74. keep_idle: int = DEFAULT_KEEP_IDLE,
  75. ca_cert: str = None,
  76. verify: bool = True,
  77. client_cert: str = None,
  78. client_cert_key: str = None,
  79. http_proxy: str = None,
  80. https_proxy: str = None,
  81. **options):
  82. options = get_pool_manager_options(keep_interval,
  83. keep_count,
  84. keep_idle,
  85. ca_cert,
  86. verify,
  87. client_cert,
  88. client_cert_key,
  89. **options)
  90. if http_proxy:
  91. if https_proxy:
  92. raise ProgrammingError('Only one of http_proxy or https_proxy should be specified')
  93. if not http_proxy.startswith('http'):
  94. http_proxy = f'http://{http_proxy}'
  95. manager = ProxyManager(http_proxy, **options)
  96. elif https_proxy:
  97. if not https_proxy.startswith('http'):
  98. https_proxy = f'https://{https_proxy}'
  99. manager = ProxyManager(https_proxy, **options)
  100. else:
  101. manager = PoolManager(**options)
  102. all_managers[manager] = int(time.time())
  103. return manager
  104. def check_conn_reset(manager: PoolManager):
  105. reset_seconds = common.get_setting('max_connection_age')
  106. if reset_seconds:
  107. last_reset = all_managers.get(manager, 0)
  108. now = int(time.time())
  109. if last_reset < now - reset_seconds:
  110. logger.debug('connection reset')
  111. manager.clear()
  112. all_managers[manager] = now
  113. def get_proxy_manager(host: str, http_proxy):
  114. key = f'{host}__{http_proxy}'
  115. if key in _proxy_managers:
  116. return _proxy_managers[key]
  117. proxy_manager = get_pool_manager(http_proxy=http_proxy)
  118. _proxy_managers[key] = proxy_manager
  119. return proxy_manager
  120. def get_response_data(response: HTTPResponse) -> bytes:
  121. encoding = response.headers.get('content-encoding', None)
  122. if encoding == 'zstd':
  123. try:
  124. zstd_decom = zstandard.ZstdDecompressor()
  125. return zstd_decom.stream_reader(response.data).read()
  126. except zstandard.ZstdError:
  127. pass
  128. if encoding == 'lz4':
  129. lz4_decom = lz4.frame.LZ4FrameDecompressor()
  130. return lz4_decom.decompress(response.data, len(response.data))
  131. return response.data
  132. def check_env_proxy(scheme: str, host: str, port: int) -> Optional[str]:
  133. env_var = f'{scheme}_proxy'.lower()
  134. proxy = os.environ.get(env_var)
  135. if not proxy:
  136. proxy = os.environ.get(env_var.upper())
  137. if not proxy:
  138. return None
  139. no_proxy = os.environ.get('no_proxy')
  140. if not no_proxy:
  141. no_proxy = os.environ.get('NO_PROXY')
  142. if not no_proxy:
  143. return proxy
  144. if no_proxy == '*':
  145. return None # Wildcard no proxy means don't actually proxy anything
  146. host = host.lower()
  147. for name in no_proxy.split(','):
  148. name = name.strip()
  149. if name:
  150. name = name.lstrip('.').lower()
  151. if name in (host, f'{host}:{port}'):
  152. return None # Host or host/port matches
  153. if host.endswith('.' + name):
  154. return None # Domain matches
  155. return proxy
  156. _default_pool_manager = get_pool_manager()
  157. def default_pool_manager():
  158. if multiprocessing.current_process().name == 'MainProcess':
  159. return _default_pool_manager
  160. # PoolManagers don't seem to be safe for some multiprocessing environments, always return a new one
  161. return get_pool_manager()
  162. class ResponseSource:
  163. def __init__(self, response: HTTPResponse, chunk_size: int = 1024 * 1024):
  164. self.response = response
  165. compression = response.headers.get('content-encoding')
  166. if compression == 'zstd':
  167. zstd_decom = zstandard.ZstdDecompressor().decompressobj()
  168. def decompress():
  169. while True:
  170. chunk = response.read(chunk_size, decode_content=False)
  171. if not chunk:
  172. break
  173. yield zstd_decom.decompress(chunk)
  174. self.gen = decompress()
  175. elif compression == 'lz4':
  176. lz4_decom = lz4.frame.LZ4FrameDecompressor()
  177. def decompress():
  178. while lz4_decom.needs_input:
  179. data = self.response.read(chunk_size, decode_content=False)
  180. if lz4_decom.unused_data:
  181. data = lz4_decom.unused_data + data
  182. if not data:
  183. return
  184. chunk = lz4_decom.decompress(data)
  185. if chunk:
  186. yield chunk
  187. self.gen = decompress()
  188. else:
  189. self.gen = response.stream(amt=chunk_size, decode_content=True)
  190. def close(self):
  191. self.response.drain_conn()
  192. self.response.close()