_url.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import os
  2. import socket
  3. import struct
  4. from typing import Optional
  5. from urllib.parse import unquote, urlparse
  6. from ._exceptions import WebSocketProxyException
  7. """
  8. _url.py
  9. websocket - WebSocket client library for Python
  10. Copyright 2024 engn33r
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. """
  21. __all__ = ["parse_url", "get_proxy_info"]
  22. def parse_url(url: str) -> tuple:
  23. """
  24. parse url and the result is tuple of
  25. (hostname, port, resource path and the flag of secure mode)
  26. Parameters
  27. ----------
  28. url: str
  29. url string.
  30. """
  31. if ":" not in url:
  32. raise ValueError("url is invalid")
  33. scheme, url = url.split(":", 1)
  34. parsed = urlparse(url, scheme="http")
  35. if parsed.hostname:
  36. hostname = parsed.hostname
  37. else:
  38. raise ValueError("hostname is invalid")
  39. port = 0
  40. if parsed.port:
  41. port = parsed.port
  42. is_secure = False
  43. if scheme == "ws":
  44. if not port:
  45. port = 80
  46. elif scheme == "wss":
  47. is_secure = True
  48. if not port:
  49. port = 443
  50. else:
  51. raise ValueError("scheme %s is invalid" % scheme)
  52. if parsed.path:
  53. resource = parsed.path
  54. else:
  55. resource = "/"
  56. if parsed.query:
  57. resource += f"?{parsed.query}"
  58. return hostname, port, resource, is_secure
  59. DEFAULT_NO_PROXY_HOST = ["localhost", "127.0.0.1"]
  60. def _is_ip_address(addr: str) -> bool:
  61. try:
  62. socket.inet_aton(addr)
  63. except socket.error:
  64. return False
  65. else:
  66. return True
  67. def _is_subnet_address(hostname: str) -> bool:
  68. try:
  69. addr, netmask = hostname.split("/")
  70. return _is_ip_address(addr) and 0 <= int(netmask) < 32
  71. except ValueError:
  72. return False
  73. def _is_address_in_network(ip: str, net: str) -> bool:
  74. ipaddr: int = struct.unpack("!I", socket.inet_aton(ip))[0]
  75. netaddr, netmask = net.split("/")
  76. netaddr: int = struct.unpack("!I", socket.inet_aton(netaddr))[0]
  77. netmask = (0xFFFFFFFF << (32 - int(netmask))) & 0xFFFFFFFF
  78. return ipaddr & netmask == netaddr
  79. def _is_no_proxy_host(hostname: str, no_proxy: Optional[list]) -> bool:
  80. if not no_proxy:
  81. if v := os.environ.get("no_proxy", os.environ.get("NO_PROXY", "")).replace(
  82. " ", ""
  83. ):
  84. no_proxy = v.split(",")
  85. if not no_proxy:
  86. no_proxy = DEFAULT_NO_PROXY_HOST
  87. if "*" in no_proxy:
  88. return True
  89. if hostname in no_proxy:
  90. return True
  91. if _is_ip_address(hostname):
  92. return any(
  93. [
  94. _is_address_in_network(hostname, subnet)
  95. for subnet in no_proxy
  96. if _is_subnet_address(subnet)
  97. ]
  98. )
  99. for domain in [domain for domain in no_proxy if domain.startswith(".")]:
  100. if hostname.endswith(domain):
  101. return True
  102. return False
  103. def get_proxy_info(
  104. hostname: str,
  105. is_secure: bool,
  106. proxy_host: Optional[str] = None,
  107. proxy_port: int = 0,
  108. proxy_auth: Optional[tuple] = None,
  109. no_proxy: Optional[list] = None,
  110. proxy_type: str = "http",
  111. ) -> tuple:
  112. """
  113. Try to retrieve proxy host and port from environment
  114. if not provided in options.
  115. Result is (proxy_host, proxy_port, proxy_auth).
  116. proxy_auth is tuple of username and password
  117. of proxy authentication information.
  118. Parameters
  119. ----------
  120. hostname: str
  121. Websocket server name.
  122. is_secure: bool
  123. Is the connection secure? (wss) looks for "https_proxy" in env
  124. instead of "http_proxy"
  125. proxy_host: str
  126. http proxy host name.
  127. proxy_port: str or int
  128. http proxy port.
  129. no_proxy: list
  130. Whitelisted host names that don't use the proxy.
  131. proxy_auth: tuple
  132. HTTP proxy auth information. Tuple of username and password. Default is None.
  133. proxy_type: str
  134. Specify the proxy protocol (http, socks4, socks4a, socks5, socks5h). Default is "http".
  135. Use socks4a or socks5h if you want to send DNS requests through the proxy.
  136. """
  137. if _is_no_proxy_host(hostname, no_proxy):
  138. return None, 0, None
  139. if proxy_host:
  140. if not proxy_port:
  141. raise WebSocketProxyException("Cannot use port 0 when proxy_host specified")
  142. port = proxy_port
  143. auth = proxy_auth
  144. return proxy_host, port, auth
  145. env_key = "https_proxy" if is_secure else "http_proxy"
  146. value = os.environ.get(env_key, os.environ.get(env_key.upper(), "")).replace(
  147. " ", ""
  148. )
  149. if value:
  150. proxy = urlparse(value)
  151. auth = (
  152. (unquote(proxy.username), unquote(proxy.password))
  153. if proxy.username
  154. else None
  155. )
  156. return proxy.hostname, proxy.port, auth
  157. return None, 0, None