123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677 |
- import inspect
- import selectors
- import socket
- import threading
- import time
- from typing import Any, Callable, Optional, Union
- from . import _logging
- from ._abnf import ABNF
- from ._core import WebSocket, getdefaulttimeout
- from ._exceptions import (
- WebSocketConnectionClosedException,
- WebSocketException,
- WebSocketTimeoutException,
- )
- from ._ssl_compat import SSLEOFError
- from ._url import parse_url
- """
- _app.py
- websocket - WebSocket client library for Python
- Copyright 2024 engn33r
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- __all__ = ["WebSocketApp"]
- RECONNECT = 0
- def setReconnect(reconnectInterval: int) -> None:
- global RECONNECT
- RECONNECT = reconnectInterval
- class DispatcherBase:
- """
- DispatcherBase
- """
- def __init__(self, app: Any, ping_timeout: Union[float, int, None]) -> None:
- self.app = app
- self.ping_timeout = ping_timeout
- def timeout(self, seconds: Union[float, int, None], callback: Callable) -> None:
- time.sleep(seconds)
- callback()
- def reconnect(self, seconds: int, reconnector: Callable) -> None:
- try:
- _logging.info(
- f"reconnect() - retrying in {seconds} seconds [{len(inspect.stack())} frames in stack]"
- )
- time.sleep(seconds)
- reconnector(reconnecting=True)
- except KeyboardInterrupt as e:
- _logging.info(f"User exited {e}")
- raise e
- class Dispatcher(DispatcherBase):
- """
- Dispatcher
- """
- def read(
- self,
- sock: socket.socket,
- read_callback: Callable,
- check_callback: Callable,
- ) -> None:
- sel = selectors.DefaultSelector()
- sel.register(self.app.sock.sock, selectors.EVENT_READ)
- try:
- while self.app.keep_running:
- if sel.select(self.ping_timeout):
- if not read_callback():
- break
- check_callback()
- finally:
- sel.close()
- class SSLDispatcher(DispatcherBase):
- """
- SSLDispatcher
- """
- def read(
- self,
- sock: socket.socket,
- read_callback: Callable,
- check_callback: Callable,
- ) -> None:
- sock = self.app.sock.sock
- sel = selectors.DefaultSelector()
- sel.register(sock, selectors.EVENT_READ)
- try:
- while self.app.keep_running:
- if self.select(sock, sel):
- if not read_callback():
- break
- check_callback()
- finally:
- sel.close()
- def select(self, sock, sel: selectors.DefaultSelector):
- sock = self.app.sock.sock
- if sock.pending():
- return [
- sock,
- ]
- r = sel.select(self.ping_timeout)
- if len(r) > 0:
- return r[0][0]
- class WrappedDispatcher:
- """
- WrappedDispatcher
- """
- def __init__(self, app, ping_timeout: Union[float, int, None], dispatcher) -> None:
- self.app = app
- self.ping_timeout = ping_timeout
- self.dispatcher = dispatcher
- dispatcher.signal(2, dispatcher.abort) # keyboard interrupt
- def read(
- self,
- sock: socket.socket,
- read_callback: Callable,
- check_callback: Callable,
- ) -> None:
- self.dispatcher.read(sock, read_callback)
- self.ping_timeout and self.timeout(self.ping_timeout, check_callback)
- def timeout(self, seconds: float, callback: Callable) -> None:
- self.dispatcher.timeout(seconds, callback)
- def reconnect(self, seconds: int, reconnector: Callable) -> None:
- self.timeout(seconds, reconnector)
- class WebSocketApp:
- """
- Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
- """
- def __init__(
- self,
- url: str,
- header: Union[list, dict, Callable, None] = None,
- on_open: Optional[Callable[[WebSocket], None]] = None,
- on_reconnect: Optional[Callable[[WebSocket], None]] = None,
- on_message: Optional[Callable[[WebSocket, Any], None]] = None,
- on_error: Optional[Callable[[WebSocket, Any], None]] = None,
- on_close: Optional[Callable[[WebSocket, Any, Any], None]] = None,
- on_ping: Optional[Callable] = None,
- on_pong: Optional[Callable] = None,
- on_cont_message: Optional[Callable] = None,
- keep_running: bool = True,
- get_mask_key: Optional[Callable] = None,
- cookie: Optional[str] = None,
- subprotocols: Optional[list] = None,
- on_data: Optional[Callable] = None,
- socket: Optional[socket.socket] = None,
- ) -> None:
- """
- WebSocketApp initialization
- Parameters
- ----------
- url: str
- Websocket url.
- header: list or dict or Callable
- Custom header for websocket handshake.
- If the parameter is a callable object, it is called just before the connection attempt.
- The returned dict or list is used as custom header value.
- This could be useful in order to properly setup timestamp dependent headers.
- on_open: function
- Callback object which is called at opening websocket.
- on_open has one argument.
- The 1st argument is this class object.
- on_reconnect: function
- Callback object which is called at reconnecting websocket.
- on_reconnect has one argument.
- The 1st argument is this class object.
- on_message: function
- Callback object which is called when received data.
- on_message has 2 arguments.
- The 1st argument is this class object.
- The 2nd argument is utf-8 data received from the server.
- on_error: function
- Callback object which is called when we get error.
- on_error has 2 arguments.
- The 1st argument is this class object.
- The 2nd argument is exception object.
- on_close: function
- Callback object which is called when connection is closed.
- on_close has 3 arguments.
- The 1st argument is this class object.
- The 2nd argument is close_status_code.
- The 3rd argument is close_msg.
- on_cont_message: function
- Callback object which is called when a continuation
- frame is received.
- on_cont_message has 3 arguments.
- The 1st argument is this class object.
- The 2nd argument is utf-8 string which we get from the server.
- The 3rd argument is continue flag. if 0, the data continue
- to next frame data
- on_data: function
- Callback object which is called when a message received.
- This is called before on_message or on_cont_message,
- and then on_message or on_cont_message is called.
- on_data has 4 argument.
- The 1st argument is this class object.
- The 2nd argument is utf-8 string which we get from the server.
- The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
- The 4th argument is continue flag. If 0, the data continue
- keep_running: bool
- This parameter is obsolete and ignored.
- get_mask_key: function
- A callable function to get new mask keys, see the
- WebSocket.set_mask_key's docstring for more information.
- cookie: str
- Cookie value.
- subprotocols: list
- List of available sub protocols. Default is None.
- socket: socket
- Pre-initialized stream socket.
- """
- self.url = url
- self.header = header if header is not None else []
- self.cookie = cookie
- self.on_open = on_open
- self.on_reconnect = on_reconnect
- self.on_message = on_message
- self.on_data = on_data
- self.on_error = on_error
- self.on_close = on_close
- self.on_ping = on_ping
- self.on_pong = on_pong
- self.on_cont_message = on_cont_message
- self.keep_running = False
- self.get_mask_key = get_mask_key
- self.sock: Optional[WebSocket] = None
- self.last_ping_tm = float(0)
- self.last_pong_tm = float(0)
- self.ping_thread: Optional[threading.Thread] = None
- self.stop_ping: Optional[threading.Event] = None
- self.ping_interval = float(0)
- self.ping_timeout: Union[float, int, None] = None
- self.ping_payload = ""
- self.subprotocols = subprotocols
- self.prepared_socket = socket
- self.has_errored = False
- self.has_done_teardown = False
- self.has_done_teardown_lock = threading.Lock()
- def send(self, data: Union[bytes, str], opcode: int = ABNF.OPCODE_TEXT) -> None:
- """
- send message
- Parameters
- ----------
- data: str
- Message to send. If you set opcode to OPCODE_TEXT,
- data must be utf-8 string or unicode.
- opcode: int
- Operation code of data. Default is OPCODE_TEXT.
- """
- if not self.sock or self.sock.send(data, opcode) == 0:
- raise WebSocketConnectionClosedException("Connection is already closed.")
- def send_text(self, text_data: str) -> None:
- """
- Sends UTF-8 encoded text.
- """
- if not self.sock or self.sock.send(text_data, ABNF.OPCODE_TEXT) == 0:
- raise WebSocketConnectionClosedException("Connection is already closed.")
- def send_bytes(self, data: Union[bytes, bytearray]) -> None:
- """
- Sends a sequence of bytes.
- """
- if not self.sock or self.sock.send(data, ABNF.OPCODE_BINARY) == 0:
- raise WebSocketConnectionClosedException("Connection is already closed.")
- def close(self, **kwargs) -> None:
- """
- Close websocket connection.
- """
- self.keep_running = False
- if self.sock:
- self.sock.close(**kwargs)
- self.sock = None
- def _start_ping_thread(self) -> None:
- self.last_ping_tm = self.last_pong_tm = float(0)
- self.stop_ping = threading.Event()
- self.ping_thread = threading.Thread(target=self._send_ping)
- self.ping_thread.daemon = True
- self.ping_thread.start()
- def _stop_ping_thread(self) -> None:
- if self.stop_ping:
- self.stop_ping.set()
- if self.ping_thread and self.ping_thread.is_alive():
- self.ping_thread.join(3)
- self.last_ping_tm = self.last_pong_tm = float(0)
- def _send_ping(self) -> None:
- if self.stop_ping.wait(self.ping_interval) or self.keep_running is False:
- return
- while not self.stop_ping.wait(self.ping_interval) and self.keep_running is True:
- if self.sock:
- self.last_ping_tm = time.time()
- try:
- _logging.debug("Sending ping")
- self.sock.ping(self.ping_payload)
- except Exception as e:
- _logging.debug(f"Failed to send ping: {e}")
- def run_forever(
- self,
- sockopt: tuple = None,
- sslopt: dict = None,
- ping_interval: Union[float, int] = 0,
- ping_timeout: Union[float, int, None] = None,
- ping_payload: str = "",
- http_proxy_host: str = None,
- http_proxy_port: Union[int, str] = None,
- http_no_proxy: list = None,
- http_proxy_auth: tuple = None,
- http_proxy_timeout: Optional[float] = None,
- skip_utf8_validation: bool = False,
- host: str = None,
- origin: str = None,
- dispatcher=None,
- suppress_origin: bool = False,
- proxy_type: str = None,
- reconnect: int = None,
- ) -> bool:
- """
- Run event loop for WebSocket framework.
- This loop is an infinite loop and is alive while websocket is available.
- Parameters
- ----------
- sockopt: tuple
- Values for socket.setsockopt.
- sockopt must be tuple
- and each element is argument of sock.setsockopt.
- sslopt: dict
- Optional dict object for ssl socket option.
- ping_interval: int or float
- Automatically send "ping" command
- every specified period (in seconds).
- If set to 0, no ping is sent periodically.
- ping_timeout: int or float
- Timeout (in seconds) if the pong message is not received.
- ping_payload: str
- Payload message to send with each ping.
- http_proxy_host: str
- HTTP proxy host name.
- http_proxy_port: int or str
- HTTP proxy port. If not set, set to 80.
- http_no_proxy: list
- Whitelisted host names that don't use the proxy.
- http_proxy_timeout: int or float
- HTTP proxy timeout, default is 60 sec as per python-socks.
- http_proxy_auth: tuple
- HTTP proxy auth information. tuple of username and password. Default is None.
- skip_utf8_validation: bool
- skip utf8 validation.
- host: str
- update host header.
- origin: str
- update origin header.
- dispatcher: Dispatcher object
- customize reading data from socket.
- suppress_origin: bool
- suppress outputting origin header.
- proxy_type: str
- type of proxy from: http, socks4, socks4a, socks5, socks5h
- reconnect: int
- delay interval when reconnecting
- Returns
- -------
- teardown: bool
- False if the `WebSocketApp` is closed or caught KeyboardInterrupt,
- True if any other exception was raised during a loop.
- """
- if reconnect is None:
- reconnect = RECONNECT
- if ping_timeout is not None and ping_timeout <= 0:
- raise WebSocketException("Ensure ping_timeout > 0")
- if ping_interval is not None and ping_interval < 0:
- raise WebSocketException("Ensure ping_interval >= 0")
- if ping_timeout and ping_interval and ping_interval <= ping_timeout:
- raise WebSocketException("Ensure ping_interval > ping_timeout")
- if not sockopt:
- sockopt = ()
- if not sslopt:
- sslopt = {}
- if self.sock:
- raise WebSocketException("socket is already opened")
- self.ping_interval = ping_interval
- self.ping_timeout = ping_timeout
- self.ping_payload = ping_payload
- self.has_done_teardown = False
- self.keep_running = True
- def teardown(close_frame: ABNF = None):
- """
- Tears down the connection.
- Parameters
- ----------
- close_frame: ABNF frame
- If close_frame is set, the on_close handler is invoked
- with the statusCode and reason from the provided frame.
- """
- # teardown() is called in many code paths to ensure resources are cleaned up and on_close is fired.
- # To ensure the work is only done once, we use this bool and lock.
- with self.has_done_teardown_lock:
- if self.has_done_teardown:
- return
- self.has_done_teardown = True
- self._stop_ping_thread()
- self.keep_running = False
- if self.sock:
- self.sock.close()
- close_status_code, close_reason = self._get_close_args(
- close_frame if close_frame else None
- )
- self.sock = None
- # Finally call the callback AFTER all teardown is complete
- self._callback(self.on_close, close_status_code, close_reason)
- def setSock(reconnecting: bool = False) -> None:
- if reconnecting and self.sock:
- self.sock.shutdown()
- self.sock = WebSocket(
- self.get_mask_key,
- sockopt=sockopt,
- sslopt=sslopt,
- fire_cont_frame=self.on_cont_message is not None,
- skip_utf8_validation=skip_utf8_validation,
- enable_multithread=True,
- )
- self.sock.settimeout(getdefaulttimeout())
- try:
- header = self.header() if callable(self.header) else self.header
- self.sock.connect(
- self.url,
- header=header,
- cookie=self.cookie,
- http_proxy_host=http_proxy_host,
- http_proxy_port=http_proxy_port,
- http_no_proxy=http_no_proxy,
- http_proxy_auth=http_proxy_auth,
- http_proxy_timeout=http_proxy_timeout,
- subprotocols=self.subprotocols,
- host=host,
- origin=origin,
- suppress_origin=suppress_origin,
- proxy_type=proxy_type,
- socket=self.prepared_socket,
- )
- _logging.info("Websocket connected")
- if self.ping_interval:
- self._start_ping_thread()
- if reconnecting and self.on_reconnect:
- self._callback(self.on_reconnect)
- else:
- self._callback(self.on_open)
- dispatcher.read(self.sock.sock, read, check)
- except (
- WebSocketConnectionClosedException,
- ConnectionRefusedError,
- KeyboardInterrupt,
- SystemExit,
- Exception,
- ) as e:
- handleDisconnect(e, reconnecting)
- def read() -> bool:
- if not self.keep_running:
- return teardown()
- try:
- op_code, frame = self.sock.recv_data_frame(True)
- except (
- WebSocketConnectionClosedException,
- KeyboardInterrupt,
- SSLEOFError,
- ) as e:
- if custom_dispatcher:
- return handleDisconnect(e, bool(reconnect))
- else:
- raise e
- if op_code == ABNF.OPCODE_CLOSE:
- return teardown(frame)
- elif op_code == ABNF.OPCODE_PING:
- self._callback(self.on_ping, frame.data)
- elif op_code == ABNF.OPCODE_PONG:
- self.last_pong_tm = time.time()
- self._callback(self.on_pong, frame.data)
- elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
- self._callback(self.on_data, frame.data, frame.opcode, frame.fin)
- self._callback(self.on_cont_message, frame.data, frame.fin)
- else:
- data = frame.data
- if op_code == ABNF.OPCODE_TEXT and not skip_utf8_validation:
- data = data.decode("utf-8")
- self._callback(self.on_data, data, frame.opcode, True)
- self._callback(self.on_message, data)
- return True
- def check() -> bool:
- if self.ping_timeout:
- has_timeout_expired = (
- time.time() - self.last_ping_tm > self.ping_timeout
- )
- has_pong_not_arrived_after_last_ping = (
- self.last_pong_tm - self.last_ping_tm < 0
- )
- has_pong_arrived_too_late = (
- self.last_pong_tm - self.last_ping_tm > self.ping_timeout
- )
- if (
- self.last_ping_tm
- and has_timeout_expired
- and (
- has_pong_not_arrived_after_last_ping
- or has_pong_arrived_too_late
- )
- ):
- raise WebSocketTimeoutException("ping/pong timed out")
- return True
- def handleDisconnect(
- e: Union[
- WebSocketConnectionClosedException,
- ConnectionRefusedError,
- KeyboardInterrupt,
- SystemExit,
- Exception,
- ],
- reconnecting: bool = False,
- ) -> bool:
- self.has_errored = True
- self._stop_ping_thread()
- if not reconnecting:
- self._callback(self.on_error, e)
- if isinstance(e, (KeyboardInterrupt, SystemExit)):
- teardown()
- # Propagate further
- raise
- if reconnect:
- _logging.info(f"{e} - reconnect")
- if custom_dispatcher:
- _logging.debug(
- f"Calling custom dispatcher reconnect [{len(inspect.stack())} frames in stack]"
- )
- dispatcher.reconnect(reconnect, setSock)
- else:
- _logging.error(f"{e} - goodbye")
- teardown()
- custom_dispatcher = bool(dispatcher)
- dispatcher = self.create_dispatcher(
- ping_timeout, dispatcher, parse_url(self.url)[3]
- )
- try:
- setSock()
- if not custom_dispatcher and reconnect:
- while self.keep_running:
- _logging.debug(
- f"Calling dispatcher reconnect [{len(inspect.stack())} frames in stack]"
- )
- dispatcher.reconnect(reconnect, setSock)
- except (KeyboardInterrupt, Exception) as e:
- _logging.info(f"tearing down on exception {e}")
- teardown()
- finally:
- if not custom_dispatcher:
- # Ensure teardown was called before returning from run_forever
- teardown()
- return self.has_errored
- def create_dispatcher(
- self,
- ping_timeout: Union[float, int, None],
- dispatcher: Optional[DispatcherBase] = None,
- is_ssl: bool = False,
- ) -> Union[Dispatcher, SSLDispatcher, WrappedDispatcher]:
- if dispatcher: # If custom dispatcher is set, use WrappedDispatcher
- return WrappedDispatcher(self, ping_timeout, dispatcher)
- timeout = ping_timeout or 10
- if is_ssl:
- return SSLDispatcher(self, timeout)
- return Dispatcher(self, timeout)
- def _get_close_args(self, close_frame: ABNF) -> list:
- """
- _get_close_args extracts the close code and reason from the close body
- if it exists (RFC6455 says WebSocket Connection Close Code is optional)
- """
- # Need to catch the case where close_frame is None
- # Otherwise the following if statement causes an error
- if not self.on_close or not close_frame:
- return [None, None]
- # Extract close frame status code
- if close_frame.data and len(close_frame.data) >= 2:
- close_status_code = 256 * int(close_frame.data[0]) + int(
- close_frame.data[1]
- )
- reason = close_frame.data[2:]
- if isinstance(reason, bytes):
- reason = reason.decode("utf-8")
- return [close_status_code, reason]
- else:
- # Most likely reached this because len(close_frame_data.data) < 2
- return [None, None]
- def _callback(self, callback, *args) -> None:
- if callback:
- try:
- callback(self, *args)
- except Exception as e:
- _logging.error(f"error from callback {callback}: {e}")
- if self.on_error:
- self.on_error(self, e)
|