_app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. """
  2. """
  3. """
  4. websocket - WebSocket client library for Python
  5. Copyright (C) 2010 Hiroki Ohtani(liris)
  6. This library is free software; you can redistribute it and/or
  7. modify it under the terms of the GNU Lesser General Public
  8. License as published by the Free Software Foundation; either
  9. version 2.1 of the License, or (at your option) any later version.
  10. This library is distributed in the hope that it will be useful,
  11. but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. Lesser General Public License for more details.
  14. You should have received a copy of the GNU Lesser General Public
  15. License along with this library; if not, write to the Free Software
  16. Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. """
  18. import inspect
  19. import select
  20. import sys
  21. import threading
  22. import time
  23. import traceback
  24. import six
  25. from ._abnf import ABNF
  26. from ._core import WebSocket, getdefaulttimeout
  27. from ._exceptions import *
  28. from . import _logging
  29. __all__ = ["WebSocketApp"]
  30. class Dispatcher:
  31. """
  32. Dispatcher
  33. """
  34. def __init__(self, app, ping_timeout):
  35. self.app = app
  36. self.ping_timeout = ping_timeout
  37. def read(self, sock, read_callback, check_callback):
  38. while self.app.keep_running:
  39. r, w, e = select.select(
  40. (self.app.sock.sock, ), (), (), self.ping_timeout)
  41. if r:
  42. if not read_callback():
  43. break
  44. check_callback()
  45. class SSLDispatcher:
  46. """
  47. SSLDispatcher
  48. """
  49. def __init__(self, app, ping_timeout):
  50. self.app = app
  51. self.ping_timeout = ping_timeout
  52. def read(self, sock, read_callback, check_callback):
  53. while self.app.keep_running:
  54. r = self.select()
  55. if r:
  56. if not read_callback():
  57. break
  58. check_callback()
  59. def select(self):
  60. sock = self.app.sock.sock
  61. if sock.pending():
  62. return [sock,]
  63. r, w, e = select.select((sock, ), (), (), self.ping_timeout)
  64. return r
  65. class WebSocketApp(object):
  66. """
  67. Higher level of APIs are provided. The interface is like JavaScript WebSocket object.
  68. """
  69. def __init__(self, url, header=None,
  70. on_open=None, on_message=None, on_error=None,
  71. on_close=None, on_ping=None, on_pong=None,
  72. on_cont_message=None,
  73. keep_running=True, get_mask_key=None, cookie=None,
  74. subprotocols=None,
  75. on_data=None):
  76. """
  77. WebSocketApp initialization
  78. Parameters
  79. ----------
  80. url: <type>
  81. websocket url.
  82. header: list or dict
  83. custom header for websocket handshake.
  84. on_open: <type>
  85. callable object which is called at opening websocket.
  86. this function has one argument. The argument is this class object.
  87. on_message: <type>
  88. callable object which is called when received data.
  89. on_message has 2 arguments.
  90. The 1st argument is this class object.
  91. The 2nd argument is utf-8 string which we get from the server.
  92. on_error: <type>
  93. callable object which is called when we get error.
  94. on_error has 2 arguments.
  95. The 1st argument is this class object.
  96. The 2nd argument is exception object.
  97. on_close: <type>
  98. callable object which is called when closed the connection.
  99. this function has one argument. The argument is this class object.
  100. on_cont_message: <type>
  101. callback object which is called when receive continued
  102. frame data.
  103. on_cont_message has 3 arguments.
  104. The 1st argument is this class object.
  105. The 2nd argument is utf-8 string which we get from the server.
  106. The 3rd argument is continue flag. if 0, the data continue
  107. to next frame data
  108. on_data: <type>
  109. callback object which is called when a message received.
  110. This is called before on_message or on_cont_message,
  111. and then on_message or on_cont_message is called.
  112. on_data has 4 argument.
  113. The 1st argument is this class object.
  114. The 2nd argument is utf-8 string which we get from the server.
  115. The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
  116. The 4th argument is continue flag. if 0, the data continue
  117. keep_running: <type>
  118. this parameter is obsolete and ignored.
  119. get_mask_key: func
  120. a callable to produce new mask keys,
  121. see the WebSocket.set_mask_key's docstring for more information
  122. cookie: str
  123. cookie value.
  124. subprotocols: <type>
  125. array of available sub protocols. default is None.
  126. """
  127. self.url = url
  128. self.header = header if header is not None else []
  129. self.cookie = cookie
  130. self.on_open = on_open
  131. self.on_message = on_message
  132. self.on_data = on_data
  133. self.on_error = on_error
  134. self.on_close = on_close
  135. self.on_ping = on_ping
  136. self.on_pong = on_pong
  137. self.on_cont_message = on_cont_message
  138. self.keep_running = False
  139. self.get_mask_key = get_mask_key
  140. self.sock = None
  141. self.last_ping_tm = 0
  142. self.last_pong_tm = 0
  143. self.subprotocols = subprotocols
  144. def send(self, data, opcode=ABNF.OPCODE_TEXT):
  145. """
  146. send message
  147. Parameters
  148. ----------
  149. data: <type>
  150. Message to send. If you set opcode to OPCODE_TEXT,
  151. data must be utf-8 string or unicode.
  152. opcode: <type>
  153. Operation code of data. default is OPCODE_TEXT.
  154. """
  155. if not self.sock or self.sock.send(data, opcode) == 0:
  156. raise WebSocketConnectionClosedException(
  157. "Connection is already closed.")
  158. def close(self, **kwargs):
  159. """
  160. Close websocket connection.
  161. """
  162. self.keep_running = False
  163. if self.sock:
  164. self.sock.close(**kwargs)
  165. self.sock = None
  166. def _send_ping(self, interval, event, payload):
  167. while not event.wait(interval):
  168. self.last_ping_tm = time.time()
  169. if self.sock:
  170. try:
  171. self.sock.ping(payload)
  172. except Exception as ex:
  173. _logging.warning("send_ping routine terminated: {}".format(ex))
  174. break
  175. def run_forever(self, sockopt=None, sslopt=None,
  176. ping_interval=0, ping_timeout=None,
  177. ping_payload="",
  178. http_proxy_host=None, http_proxy_port=None,
  179. http_no_proxy=None, http_proxy_auth=None,
  180. skip_utf8_validation=False,
  181. host=None, origin=None, dispatcher=None,
  182. suppress_origin=False, proxy_type=None):
  183. """
  184. Run event loop for WebSocket framework.
  185. This loop is an infinite loop and is alive while websocket is available.
  186. Parameters
  187. ----------
  188. sockopt: tuple
  189. values for socket.setsockopt.
  190. sockopt must be tuple
  191. and each element is argument of sock.setsockopt.
  192. sslopt: dict
  193. optional dict object for ssl socket option.
  194. ping_interval: int or float
  195. automatically send "ping" command
  196. every specified period (in seconds)
  197. if set to 0, not send automatically.
  198. ping_timeout: int or float
  199. timeout (in seconds) if the pong message is not received.
  200. ping_payload: str
  201. payload message to send with each ping.
  202. http_proxy_host: <type>
  203. http proxy host name.
  204. http_proxy_port: <type>
  205. http proxy port. If not set, set to 80.
  206. http_no_proxy: <type>
  207. host names, which doesn't use proxy.
  208. skip_utf8_validation: bool
  209. skip utf8 validation.
  210. host: str
  211. update host header.
  212. origin: str
  213. update origin header.
  214. dispatcher: <type>
  215. customize reading data from socket.
  216. suppress_origin: bool
  217. suppress outputting origin header.
  218. Returns
  219. -------
  220. teardown: bool
  221. False if caught KeyboardInterrupt, True if other exception was raised during a loop
  222. """
  223. if ping_timeout is not None and ping_timeout <= 0:
  224. ping_timeout = None
  225. if ping_timeout and ping_interval and ping_interval <= ping_timeout:
  226. raise WebSocketException("Ensure ping_interval > ping_timeout")
  227. if not sockopt:
  228. sockopt = []
  229. if not sslopt:
  230. sslopt = {}
  231. if self.sock:
  232. raise WebSocketException("socket is already opened")
  233. thread = None
  234. self.keep_running = True
  235. self.last_ping_tm = 0
  236. self.last_pong_tm = 0
  237. def teardown(close_frame=None):
  238. """
  239. Tears down the connection.
  240. If close_frame is set, we will invoke the on_close handler with the
  241. statusCode and reason from there.
  242. """
  243. if thread and thread.is_alive():
  244. event.set()
  245. thread.join()
  246. self.keep_running = False
  247. if self.sock:
  248. self.sock.close()
  249. close_args = self._get_close_args(
  250. close_frame.data if close_frame else None)
  251. self._callback(self.on_close, *close_args)
  252. self.sock = None
  253. try:
  254. self.sock = WebSocket(
  255. self.get_mask_key, sockopt=sockopt, sslopt=sslopt,
  256. fire_cont_frame=self.on_cont_message is not None,
  257. skip_utf8_validation=skip_utf8_validation,
  258. enable_multithread=True if ping_interval else False)
  259. self.sock.settimeout(getdefaulttimeout())
  260. self.sock.connect(
  261. self.url, header=self.header, cookie=self.cookie,
  262. http_proxy_host=http_proxy_host,
  263. http_proxy_port=http_proxy_port, http_no_proxy=http_no_proxy,
  264. http_proxy_auth=http_proxy_auth, subprotocols=self.subprotocols,
  265. host=host, origin=origin, suppress_origin=suppress_origin,
  266. proxy_type=proxy_type)
  267. if not dispatcher:
  268. dispatcher = self.create_dispatcher(ping_timeout)
  269. self._callback(self.on_open)
  270. if ping_interval:
  271. event = threading.Event()
  272. thread = threading.Thread(
  273. target=self._send_ping, args=(ping_interval, event, ping_payload))
  274. thread.daemon = True
  275. thread.start()
  276. def read():
  277. if not self.keep_running:
  278. return teardown()
  279. op_code, frame = self.sock.recv_data_frame(True)
  280. if op_code == ABNF.OPCODE_CLOSE:
  281. return teardown(frame)
  282. elif op_code == ABNF.OPCODE_PING:
  283. self._callback(self.on_ping, frame.data)
  284. elif op_code == ABNF.OPCODE_PONG:
  285. self.last_pong_tm = time.time()
  286. self._callback(self.on_pong, frame.data)
  287. elif op_code == ABNF.OPCODE_CONT and self.on_cont_message:
  288. self._callback(self.on_data, frame.data,
  289. frame.opcode, frame.fin)
  290. self._callback(self.on_cont_message,
  291. frame.data, frame.fin)
  292. else:
  293. data = frame.data
  294. if six.PY3 and op_code == ABNF.OPCODE_TEXT:
  295. data = data.decode("utf-8")
  296. self._callback(self.on_data, data, frame.opcode, True)
  297. self._callback(self.on_message, data)
  298. return True
  299. def check():
  300. if (ping_timeout):
  301. has_timeout_expired = time.time() - self.last_ping_tm > ping_timeout
  302. has_pong_not_arrived_after_last_ping = self.last_pong_tm - self.last_ping_tm < 0
  303. has_pong_arrived_too_late = self.last_pong_tm - self.last_ping_tm > ping_timeout
  304. if (self.last_ping_tm and
  305. has_timeout_expired and
  306. (has_pong_not_arrived_after_last_ping or has_pong_arrived_too_late)):
  307. raise WebSocketTimeoutException("ping/pong timed out")
  308. return True
  309. dispatcher.read(self.sock.sock, read, check)
  310. except (Exception, KeyboardInterrupt, SystemExit) as e:
  311. self._callback(self.on_error, e)
  312. if isinstance(e, SystemExit):
  313. # propagate SystemExit further
  314. raise
  315. teardown()
  316. return not isinstance(e, KeyboardInterrupt)
  317. def create_dispatcher(self, ping_timeout):
  318. timeout = ping_timeout or 10
  319. if self.sock.is_ssl():
  320. return SSLDispatcher(self, timeout)
  321. return Dispatcher(self, timeout)
  322. def _get_close_args(self, data):
  323. """
  324. _get_close_args extracts the code, reason from the close body
  325. if they exists, and if the self.on_close except three arguments
  326. """
  327. # if the on_close callback is "old", just return empty list
  328. if sys.version_info < (3, 0):
  329. if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
  330. return []
  331. else:
  332. if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
  333. return []
  334. if data and len(data) >= 2:
  335. code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
  336. reason = data[2:].decode('utf-8')
  337. return [code, reason]
  338. return [None, None]
  339. def _callback(self, callback, *args):
  340. if callback:
  341. try:
  342. callback(self, *args)
  343. except Exception as e:
  344. _logging.error("error from callback {}: {}".format(callback, e))
  345. if _logging.isEnabledForDebug():
  346. _, _, tb = sys.exc_info()
  347. traceback.print_tb(tb)