_wsdump.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. #!/usr/bin/env python3
  2. """
  3. wsdump.py
  4. websocket - WebSocket client library for Python
  5. Copyright 2024 engn33r
  6. Licensed under the Apache License, Version 2.0 (the "License");
  7. you may not use this file except in compliance with the License.
  8. You may obtain a copy of the License at
  9. http://www.apache.org/licenses/LICENSE-2.0
  10. Unless required by applicable law or agreed to in writing, software
  11. distributed under the License is distributed on an "AS IS" BASIS,
  12. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. See the License for the specific language governing permissions and
  14. limitations under the License.
  15. """
  16. import argparse
  17. import code
  18. import gzip
  19. import ssl
  20. import sys
  21. import threading
  22. import time
  23. import zlib
  24. from urllib.parse import urlparse
  25. import websocket
  26. try:
  27. import readline
  28. except ImportError:
  29. pass
  30. def get_encoding() -> str:
  31. encoding = getattr(sys.stdin, "encoding", "")
  32. if not encoding:
  33. return "utf-8"
  34. else:
  35. return encoding.lower()
  36. OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
  37. ENCODING = get_encoding()
  38. class VAction(argparse.Action):
  39. def __call__(
  40. self,
  41. parser: argparse.Namespace,
  42. args: tuple,
  43. values: str,
  44. option_string: str = None,
  45. ) -> None:
  46. if values is None:
  47. values = "1"
  48. try:
  49. values = int(values)
  50. except ValueError:
  51. values = values.count("v") + 1
  52. setattr(args, self.dest, values)
  53. def parse_args() -> argparse.Namespace:
  54. parser = argparse.ArgumentParser(description="WebSocket Simple Dump Tool")
  55. parser.add_argument(
  56. "url", metavar="ws_url", help="websocket url. ex. ws://echo.websocket.events/"
  57. )
  58. parser.add_argument("-p", "--proxy", help="proxy url. ex. http://127.0.0.1:8080")
  59. parser.add_argument(
  60. "-v",
  61. "--verbose",
  62. default=0,
  63. nargs="?",
  64. action=VAction,
  65. dest="verbose",
  66. help="set verbose mode. If set to 1, show opcode. "
  67. "If set to 2, enable to trace websocket module",
  68. )
  69. parser.add_argument(
  70. "-n", "--nocert", action="store_true", help="Ignore invalid SSL cert"
  71. )
  72. parser.add_argument("-r", "--raw", action="store_true", help="raw output")
  73. parser.add_argument("-s", "--subprotocols", nargs="*", help="Set subprotocols")
  74. parser.add_argument("-o", "--origin", help="Set origin")
  75. parser.add_argument(
  76. "--eof-wait",
  77. default=0,
  78. type=int,
  79. help="wait time(second) after 'EOF' received.",
  80. )
  81. parser.add_argument("-t", "--text", help="Send initial text")
  82. parser.add_argument(
  83. "--timings", action="store_true", help="Print timings in seconds"
  84. )
  85. parser.add_argument("--headers", help="Set custom headers. Use ',' as separator")
  86. return parser.parse_args()
  87. class RawInput:
  88. def raw_input(self, prompt: str = "") -> str:
  89. line = input(prompt)
  90. if ENCODING and ENCODING != "utf-8" and not isinstance(line, str):
  91. line = line.decode(ENCODING).encode("utf-8")
  92. elif isinstance(line, str):
  93. line = line.encode("utf-8")
  94. return line
  95. class InteractiveConsole(RawInput, code.InteractiveConsole):
  96. def write(self, data: str) -> None:
  97. sys.stdout.write("\033[2K\033[E")
  98. # sys.stdout.write("\n")
  99. sys.stdout.write("\033[34m< " + data + "\033[39m")
  100. sys.stdout.write("\n> ")
  101. sys.stdout.flush()
  102. def read(self) -> str:
  103. return self.raw_input("> ")
  104. class NonInteractive(RawInput):
  105. def write(self, data: str) -> None:
  106. sys.stdout.write(data)
  107. sys.stdout.write("\n")
  108. sys.stdout.flush()
  109. def read(self) -> str:
  110. return self.raw_input("")
  111. def main() -> None:
  112. start_time = time.time()
  113. args = parse_args()
  114. if args.verbose > 1:
  115. websocket.enableTrace(True)
  116. options = {}
  117. if args.proxy:
  118. p = urlparse(args.proxy)
  119. options["http_proxy_host"] = p.hostname
  120. options["http_proxy_port"] = p.port
  121. if args.origin:
  122. options["origin"] = args.origin
  123. if args.subprotocols:
  124. options["subprotocols"] = args.subprotocols
  125. opts = {}
  126. if args.nocert:
  127. opts = {"cert_reqs": ssl.CERT_NONE, "check_hostname": False}
  128. if args.headers:
  129. options["header"] = list(map(str.strip, args.headers.split(",")))
  130. ws = websocket.create_connection(args.url, sslopt=opts, **options)
  131. if args.raw:
  132. console = NonInteractive()
  133. else:
  134. console = InteractiveConsole()
  135. print("Press Ctrl+C to quit")
  136. def recv() -> tuple:
  137. try:
  138. frame = ws.recv_frame()
  139. except websocket.WebSocketException:
  140. return websocket.ABNF.OPCODE_CLOSE, ""
  141. if not frame:
  142. raise websocket.WebSocketException(f"Not a valid frame {frame}")
  143. elif frame.opcode in OPCODE_DATA:
  144. return frame.opcode, frame.data
  145. elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
  146. ws.send_close()
  147. return frame.opcode, ""
  148. elif frame.opcode == websocket.ABNF.OPCODE_PING:
  149. ws.pong(frame.data)
  150. return frame.opcode, frame.data
  151. return frame.opcode, frame.data
  152. def recv_ws() -> None:
  153. while True:
  154. opcode, data = recv()
  155. msg = None
  156. if opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes):
  157. data = str(data, "utf-8")
  158. if (
  159. isinstance(data, bytes) and len(data) > 2 and data[:2] == b"\037\213"
  160. ): # gzip magick
  161. try:
  162. data = "[gzip] " + str(gzip.decompress(data), "utf-8")
  163. except:
  164. pass
  165. elif isinstance(data, bytes):
  166. try:
  167. data = "[zlib] " + str(
  168. zlib.decompress(data, -zlib.MAX_WBITS), "utf-8"
  169. )
  170. except:
  171. pass
  172. if isinstance(data, bytes):
  173. data = repr(data)
  174. if args.verbose:
  175. msg = f"{websocket.ABNF.OPCODE_MAP.get(opcode)}: {data}"
  176. else:
  177. msg = data
  178. if msg is not None:
  179. if args.timings:
  180. console.write(f"{time.time() - start_time}: {msg}")
  181. else:
  182. console.write(msg)
  183. if opcode == websocket.ABNF.OPCODE_CLOSE:
  184. break
  185. thread = threading.Thread(target=recv_ws)
  186. thread.daemon = True
  187. thread.start()
  188. if args.text:
  189. ws.send(args.text)
  190. while True:
  191. try:
  192. message = console.read()
  193. ws.send(message)
  194. except KeyboardInterrupt:
  195. return
  196. except EOFError:
  197. time.sleep(args.eof_wait)
  198. return
  199. if __name__ == "__main__":
  200. try:
  201. main()
  202. except Exception as e:
  203. print(e)