_core.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. import socket
  2. import struct
  3. import threading
  4. import time
  5. from typing import Optional, Union
  6. # websocket modules
  7. from ._abnf import ABNF, STATUS_NORMAL, continuous_frame, frame_buffer
  8. from ._exceptions import WebSocketProtocolException, WebSocketConnectionClosedException
  9. from ._handshake import SUPPORTED_REDIRECT_STATUSES, handshake
  10. from ._http import connect, proxy_info
  11. from ._logging import debug, error, trace, isEnabledForError, isEnabledForTrace
  12. from ._socket import getdefaulttimeout, recv, send, sock_opt
  13. from ._ssl_compat import ssl
  14. from ._utils import NoLock
  15. """
  16. _core.py
  17. websocket - WebSocket client library for Python
  18. Copyright 2024 engn33r
  19. Licensed under the Apache License, Version 2.0 (the "License");
  20. you may not use this file except in compliance with the License.
  21. You may obtain a copy of the License at
  22. http://www.apache.org/licenses/LICENSE-2.0
  23. Unless required by applicable law or agreed to in writing, software
  24. distributed under the License is distributed on an "AS IS" BASIS,
  25. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  26. See the License for the specific language governing permissions and
  27. limitations under the License.
  28. """
  29. __all__ = ["WebSocket", "create_connection"]
  30. class WebSocket:
  31. """
  32. Low level WebSocket interface.
  33. This class is based on the WebSocket protocol `draft-hixie-thewebsocketprotocol-76 <http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76>`_
  34. We can connect to the websocket server and send/receive data.
  35. The following example is an echo client.
  36. >>> import websocket
  37. >>> ws = websocket.WebSocket()
  38. >>> ws.connect("ws://echo.websocket.events")
  39. >>> ws.recv()
  40. 'echo.websocket.events sponsored by Lob.com'
  41. >>> ws.send("Hello, Server")
  42. 19
  43. >>> ws.recv()
  44. 'Hello, Server'
  45. >>> ws.close()
  46. Parameters
  47. ----------
  48. get_mask_key: func
  49. A callable function to get new mask keys, see the
  50. WebSocket.set_mask_key's docstring for more information.
  51. sockopt: tuple
  52. Values for socket.setsockopt.
  53. sockopt must be tuple and each element is argument of sock.setsockopt.
  54. sslopt: dict
  55. Optional dict object for ssl socket options. See FAQ for details.
  56. fire_cont_frame: bool
  57. Fire recv event for each cont frame. Default is False.
  58. enable_multithread: bool
  59. If set to True, lock send method.
  60. skip_utf8_validation: bool
  61. Skip utf8 validation.
  62. """
  63. def __init__(
  64. self,
  65. get_mask_key=None,
  66. sockopt=None,
  67. sslopt=None,
  68. fire_cont_frame: bool = False,
  69. enable_multithread: bool = True,
  70. skip_utf8_validation: bool = False,
  71. **_,
  72. ):
  73. """
  74. Initialize WebSocket object.
  75. Parameters
  76. ----------
  77. sslopt: dict
  78. Optional dict object for ssl socket options. See FAQ for details.
  79. """
  80. self.sock_opt = sock_opt(sockopt, sslopt)
  81. self.handshake_response = None
  82. self.sock: Optional[socket.socket] = None
  83. self.connected = False
  84. self.get_mask_key = get_mask_key
  85. # These buffer over the build-up of a single frame.
  86. self.frame_buffer = frame_buffer(self._recv, skip_utf8_validation)
  87. self.cont_frame = continuous_frame(fire_cont_frame, skip_utf8_validation)
  88. if enable_multithread:
  89. self.lock = threading.Lock()
  90. self.readlock = threading.Lock()
  91. else:
  92. self.lock = NoLock()
  93. self.readlock = NoLock()
  94. def __iter__(self):
  95. """
  96. Allow iteration over websocket, implying sequential `recv` executions.
  97. """
  98. while True:
  99. yield self.recv()
  100. def __next__(self):
  101. return self.recv()
  102. def next(self):
  103. return self.__next__()
  104. def fileno(self):
  105. return self.sock.fileno()
  106. def set_mask_key(self, func):
  107. """
  108. Set function to create mask key. You can customize mask key generator.
  109. Mainly, this is for testing purpose.
  110. Parameters
  111. ----------
  112. func: func
  113. callable object. the func takes 1 argument as integer.
  114. The argument means length of mask key.
  115. This func must return string(byte array),
  116. which length is argument specified.
  117. """
  118. self.get_mask_key = func
  119. def gettimeout(self) -> Union[float, int, None]:
  120. """
  121. Get the websocket timeout (in seconds) as an int or float
  122. Returns
  123. ----------
  124. timeout: int or float
  125. returns timeout value (in seconds). This value could be either float/integer.
  126. """
  127. return self.sock_opt.timeout
  128. def settimeout(self, timeout: Union[float, int, None]):
  129. """
  130. Set the timeout to the websocket.
  131. Parameters
  132. ----------
  133. timeout: int or float
  134. timeout time (in seconds). This value could be either float/integer.
  135. """
  136. self.sock_opt.timeout = timeout
  137. if self.sock:
  138. self.sock.settimeout(timeout)
  139. timeout = property(gettimeout, settimeout)
  140. def getsubprotocol(self):
  141. """
  142. Get subprotocol
  143. """
  144. if self.handshake_response:
  145. return self.handshake_response.subprotocol
  146. else:
  147. return None
  148. subprotocol = property(getsubprotocol)
  149. def getstatus(self):
  150. """
  151. Get handshake status
  152. """
  153. if self.handshake_response:
  154. return self.handshake_response.status
  155. else:
  156. return None
  157. status = property(getstatus)
  158. def getheaders(self):
  159. """
  160. Get handshake response header
  161. """
  162. if self.handshake_response:
  163. return self.handshake_response.headers
  164. else:
  165. return None
  166. def is_ssl(self):
  167. try:
  168. return isinstance(self.sock, ssl.SSLSocket)
  169. except:
  170. return False
  171. headers = property(getheaders)
  172. def connect(self, url, **options):
  173. """
  174. Connect to url. url is websocket url scheme.
  175. ie. ws://host:port/resource
  176. You can customize using 'options'.
  177. If you set "header" list object, you can set your own custom header.
  178. >>> ws = WebSocket()
  179. >>> ws.connect("ws://echo.websocket.events",
  180. ... header=["User-Agent: MyProgram",
  181. ... "x-custom: header"])
  182. Parameters
  183. ----------
  184. header: list or dict
  185. Custom http header list or dict.
  186. cookie: str
  187. Cookie value.
  188. origin: str
  189. Custom origin url.
  190. connection: str
  191. Custom connection header value.
  192. Default value "Upgrade" set in _handshake.py
  193. suppress_origin: bool
  194. Suppress outputting origin header.
  195. host: str
  196. Custom host header string.
  197. timeout: int or float
  198. Socket timeout time. This value is an integer or float.
  199. If you set None for this value, it means "use default_timeout value"
  200. http_proxy_host: str
  201. HTTP proxy host name.
  202. http_proxy_port: str or int
  203. HTTP proxy port. Default is 80.
  204. http_no_proxy: list
  205. Whitelisted host names that don't use the proxy.
  206. http_proxy_auth: tuple
  207. HTTP proxy auth information. Tuple of username and password. Default is None.
  208. http_proxy_timeout: int or float
  209. HTTP proxy timeout, default is 60 sec as per python-socks.
  210. redirect_limit: int
  211. Number of redirects to follow.
  212. subprotocols: list
  213. List of available subprotocols. Default is None.
  214. socket: socket
  215. Pre-initialized stream socket.
  216. """
  217. self.sock_opt.timeout = options.get("timeout", self.sock_opt.timeout)
  218. self.sock, addrs = connect(
  219. url, self.sock_opt, proxy_info(**options), options.pop("socket", None)
  220. )
  221. try:
  222. self.handshake_response = handshake(self.sock, url, *addrs, **options)
  223. for _ in range(options.pop("redirect_limit", 3)):
  224. if self.handshake_response.status in SUPPORTED_REDIRECT_STATUSES:
  225. url = self.handshake_response.headers["location"]
  226. self.sock.close()
  227. self.sock, addrs = connect(
  228. url,
  229. self.sock_opt,
  230. proxy_info(**options),
  231. options.pop("socket", None),
  232. )
  233. self.handshake_response = handshake(
  234. self.sock, url, *addrs, **options
  235. )
  236. self.connected = True
  237. except:
  238. if self.sock:
  239. self.sock.close()
  240. self.sock = None
  241. raise
  242. def send(self, payload: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> int:
  243. """
  244. Send the data as string.
  245. Parameters
  246. ----------
  247. payload: str
  248. Payload must be utf-8 string or unicode,
  249. If the opcode is OPCODE_TEXT.
  250. Otherwise, it must be string(byte array).
  251. opcode: int
  252. Operation code (opcode) to send.
  253. """
  254. frame = ABNF.create_frame(payload, opcode)
  255. return self.send_frame(frame)
  256. def send_text(self, text_data: str) -> int:
  257. """
  258. Sends UTF-8 encoded text.
  259. """
  260. return self.send(text_data, ABNF.OPCODE_TEXT)
  261. def send_bytes(self, data: Union[bytes, bytearray]) -> int:
  262. """
  263. Sends a sequence of bytes.
  264. """
  265. return self.send(data, ABNF.OPCODE_BINARY)
  266. def send_frame(self, frame) -> int:
  267. """
  268. Send the data frame.
  269. >>> ws = create_connection("ws://echo.websocket.events")
  270. >>> frame = ABNF.create_frame("Hello", ABNF.OPCODE_TEXT)
  271. >>> ws.send_frame(frame)
  272. >>> cont_frame = ABNF.create_frame("My name is ", ABNF.OPCODE_CONT, 0)
  273. >>> ws.send_frame(frame)
  274. >>> cont_frame = ABNF.create_frame("Foo Bar", ABNF.OPCODE_CONT, 1)
  275. >>> ws.send_frame(frame)
  276. Parameters
  277. ----------
  278. frame: ABNF frame
  279. frame data created by ABNF.create_frame
  280. """
  281. if self.get_mask_key:
  282. frame.get_mask_key = self.get_mask_key
  283. data = frame.format()
  284. length = len(data)
  285. if isEnabledForTrace():
  286. trace(f"++Sent raw: {repr(data)}")
  287. trace(f"++Sent decoded: {frame.__str__()}")
  288. with self.lock:
  289. while data:
  290. l = self._send(data)
  291. data = data[l:]
  292. return length
  293. def send_binary(self, payload: bytes) -> int:
  294. """
  295. Send a binary message (OPCODE_BINARY).
  296. Parameters
  297. ----------
  298. payload: bytes
  299. payload of message to send.
  300. """
  301. return self.send(payload, ABNF.OPCODE_BINARY)
  302. def ping(self, payload: Union[str, bytes] = ""):
  303. """
  304. Send ping data.
  305. Parameters
  306. ----------
  307. payload: str
  308. data payload to send server.
  309. """
  310. if isinstance(payload, str):
  311. payload = payload.encode("utf-8")
  312. self.send(payload, ABNF.OPCODE_PING)
  313. def pong(self, payload: Union[str, bytes] = ""):
  314. """
  315. Send pong data.
  316. Parameters
  317. ----------
  318. payload: str
  319. data payload to send server.
  320. """
  321. if isinstance(payload, str):
  322. payload = payload.encode("utf-8")
  323. self.send(payload, ABNF.OPCODE_PONG)
  324. def recv(self) -> Union[str, bytes]:
  325. """
  326. Receive string data(byte array) from the server.
  327. Returns
  328. ----------
  329. data: string (byte array) value.
  330. """
  331. with self.readlock:
  332. opcode, data = self.recv_data()
  333. if opcode == ABNF.OPCODE_TEXT:
  334. data_received: Union[bytes, str] = data
  335. if isinstance(data_received, bytes):
  336. return data_received.decode("utf-8")
  337. elif isinstance(data_received, str):
  338. return data_received
  339. elif opcode == ABNF.OPCODE_BINARY:
  340. data_binary: bytes = data
  341. return data_binary
  342. else:
  343. return ""
  344. def recv_data(self, control_frame: bool = False) -> tuple:
  345. """
  346. Receive data with operation code.
  347. Parameters
  348. ----------
  349. control_frame: bool
  350. a boolean flag indicating whether to return control frame
  351. data, defaults to False
  352. Returns
  353. -------
  354. opcode, frame.data: tuple
  355. tuple of operation code and string(byte array) value.
  356. """
  357. opcode, frame = self.recv_data_frame(control_frame)
  358. return opcode, frame.data
  359. def recv_data_frame(self, control_frame: bool = False) -> tuple:
  360. """
  361. Receive data with operation code.
  362. If a valid ping message is received, a pong response is sent.
  363. Parameters
  364. ----------
  365. control_frame: bool
  366. a boolean flag indicating whether to return control frame
  367. data, defaults to False
  368. Returns
  369. -------
  370. frame.opcode, frame: tuple
  371. tuple of operation code and string(byte array) value.
  372. """
  373. while True:
  374. frame = self.recv_frame()
  375. if isEnabledForTrace():
  376. trace(f"++Rcv raw: {repr(frame.format())}")
  377. trace(f"++Rcv decoded: {frame.__str__()}")
  378. if not frame:
  379. # handle error:
  380. # 'NoneType' object has no attribute 'opcode'
  381. raise WebSocketProtocolException(f"Not a valid frame {frame}")
  382. elif frame.opcode in (
  383. ABNF.OPCODE_TEXT,
  384. ABNF.OPCODE_BINARY,
  385. ABNF.OPCODE_CONT,
  386. ):
  387. self.cont_frame.validate(frame)
  388. self.cont_frame.add(frame)
  389. if self.cont_frame.is_fire(frame):
  390. return self.cont_frame.extract(frame)
  391. elif frame.opcode == ABNF.OPCODE_CLOSE:
  392. self.send_close()
  393. return frame.opcode, frame
  394. elif frame.opcode == ABNF.OPCODE_PING:
  395. if len(frame.data) < 126:
  396. self.pong(frame.data)
  397. else:
  398. raise WebSocketProtocolException("Ping message is too long")
  399. if control_frame:
  400. return frame.opcode, frame
  401. elif frame.opcode == ABNF.OPCODE_PONG:
  402. if control_frame:
  403. return frame.opcode, frame
  404. def recv_frame(self):
  405. """
  406. Receive data as frame from server.
  407. Returns
  408. -------
  409. self.frame_buffer.recv_frame(): ABNF frame object
  410. """
  411. return self.frame_buffer.recv_frame()
  412. def send_close(self, status: int = STATUS_NORMAL, reason: bytes = b""):
  413. """
  414. Send close data to the server.
  415. Parameters
  416. ----------
  417. status: int
  418. Status code to send. See STATUS_XXX.
  419. reason: str or bytes
  420. The reason to close. This must be string or UTF-8 bytes.
  421. """
  422. if status < 0 or status >= ABNF.LENGTH_16:
  423. raise ValueError("code is invalid range")
  424. self.connected = False
  425. self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
  426. def close(self, status: int = STATUS_NORMAL, reason: bytes = b"", timeout: int = 3):
  427. """
  428. Close Websocket object
  429. Parameters
  430. ----------
  431. status: int
  432. Status code to send. See VALID_CLOSE_STATUS in ABNF.
  433. reason: bytes
  434. The reason to close in UTF-8.
  435. timeout: int or float
  436. Timeout until receive a close frame.
  437. If None, it will wait forever until receive a close frame.
  438. """
  439. if not self.connected:
  440. return
  441. if status < 0 or status >= ABNF.LENGTH_16:
  442. raise ValueError("code is invalid range")
  443. try:
  444. self.connected = False
  445. self.send(struct.pack("!H", status) + reason, ABNF.OPCODE_CLOSE)
  446. sock_timeout = self.sock.gettimeout()
  447. self.sock.settimeout(timeout)
  448. start_time = time.time()
  449. while timeout is None or time.time() - start_time < timeout:
  450. try:
  451. frame = self.recv_frame()
  452. if frame.opcode != ABNF.OPCODE_CLOSE:
  453. continue
  454. if isEnabledForError():
  455. recv_status = struct.unpack("!H", frame.data[0:2])[0]
  456. if recv_status >= 3000 and recv_status <= 4999:
  457. debug(f"close status: {repr(recv_status)}")
  458. elif recv_status != STATUS_NORMAL:
  459. error(f"close status: {repr(recv_status)}")
  460. break
  461. except:
  462. break
  463. self.sock.settimeout(sock_timeout)
  464. self.sock.shutdown(socket.SHUT_RDWR)
  465. except:
  466. pass
  467. self.shutdown()
  468. def abort(self):
  469. """
  470. Low-level asynchronous abort, wakes up other threads that are waiting in recv_*
  471. """
  472. if self.connected:
  473. self.sock.shutdown(socket.SHUT_RDWR)
  474. def shutdown(self):
  475. """
  476. close socket, immediately.
  477. """
  478. if self.sock:
  479. self.sock.close()
  480. self.sock = None
  481. self.connected = False
  482. def _send(self, data: Union[str, bytes]):
  483. return send(self.sock, data)
  484. def _recv(self, bufsize):
  485. try:
  486. return recv(self.sock, bufsize)
  487. except WebSocketConnectionClosedException:
  488. if self.sock:
  489. self.sock.close()
  490. self.sock = None
  491. self.connected = False
  492. raise
  493. def create_connection(url: str, timeout=None, class_=WebSocket, **options):
  494. """
  495. Connect to url and return websocket object.
  496. Connect to url and return the WebSocket object.
  497. Passing optional timeout parameter will set the timeout on the socket.
  498. If no timeout is supplied,
  499. the global default timeout setting returned by getdefaulttimeout() is used.
  500. You can customize using 'options'.
  501. If you set "header" list object, you can set your own custom header.
  502. >>> conn = create_connection("ws://echo.websocket.events",
  503. ... header=["User-Agent: MyProgram",
  504. ... "x-custom: header"])
  505. Parameters
  506. ----------
  507. class_: class
  508. class to instantiate when creating the connection. It has to implement
  509. settimeout and connect. It's __init__ should be compatible with
  510. WebSocket.__init__, i.e. accept all of it's kwargs.
  511. header: list or dict
  512. custom http header list or dict.
  513. cookie: str
  514. Cookie value.
  515. origin: str
  516. custom origin url.
  517. suppress_origin: bool
  518. suppress outputting origin header.
  519. host: str
  520. custom host header string.
  521. timeout: int or float
  522. socket timeout time. This value could be either float/integer.
  523. If set to None, it uses the default_timeout value.
  524. http_proxy_host: str
  525. HTTP proxy host name.
  526. http_proxy_port: str or int
  527. HTTP proxy port. If not set, set to 80.
  528. http_no_proxy: list
  529. Whitelisted host names that don't use the proxy.
  530. http_proxy_auth: tuple
  531. HTTP proxy auth information. tuple of username and password. Default is None.
  532. http_proxy_timeout: int or float
  533. HTTP proxy timeout, default is 60 sec as per python-socks.
  534. enable_multithread: bool
  535. Enable lock for multithread.
  536. redirect_limit: int
  537. Number of redirects to follow.
  538. sockopt: tuple
  539. Values for socket.setsockopt.
  540. sockopt must be a tuple and each element is an argument of sock.setsockopt.
  541. sslopt: dict
  542. Optional dict object for ssl socket options. See FAQ for details.
  543. subprotocols: list
  544. List of available subprotocols. Default is None.
  545. skip_utf8_validation: bool
  546. Skip utf8 validation.
  547. socket: socket
  548. Pre-initialized stream socket.
  549. """
  550. sockopt = options.pop("sockopt", [])
  551. sslopt = options.pop("sslopt", {})
  552. fire_cont_frame = options.pop("fire_cont_frame", False)
  553. enable_multithread = options.pop("enable_multithread", True)
  554. skip_utf8_validation = options.pop("skip_utf8_validation", False)
  555. websock = class_(
  556. sockopt=sockopt,
  557. sslopt=sslopt,
  558. fire_cont_frame=fire_cont_frame,
  559. enable_multithread=enable_multithread,
  560. skip_utf8_validation=skip_utf8_validation,
  561. **options,
  562. )
  563. websock.settimeout(timeout if timeout is not None else getdefaulttimeout())
  564. websock.connect(url, **options)
  565. return websock