http.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
  2. #
  3. # This program is release under the MIT license. You can find the full text of
  4. # the license in the LICENSE file.
  5. import enum
  6. import itertools
  7. import json
  8. import sys
  9. import threading
  10. from werkzeug.datastructures import Headers
  11. from werkzeug.serving import make_server
  12. from werkzeug.wrappers import Request
  13. from werkzeug.wrappers import Response
  14. class WSGIServer(threading.Thread):
  15. """
  16. HTTP server running a WSGI application in its own thread.
  17. """
  18. def __init__(self, host="127.0.0.1", port=0, application=None, **kwargs):
  19. self.app = application
  20. self._server = make_server(host, port, self.app, **kwargs)
  21. self.server_address = self._server.server_address
  22. super().__init__(name=self.__class__, target=self._server.serve_forever)
  23. def __del__(self):
  24. self.stop()
  25. def stop(self):
  26. try:
  27. server = self._server
  28. except AttributeError:
  29. pass
  30. else:
  31. server.shutdown()
  32. @property
  33. def url(self):
  34. host, port = self.server_address
  35. proto = "http" if self._server.ssl_context is None else "https"
  36. return "%s://%s:%i" % (proto, host, port)
  37. class Chunked(enum.Enum):
  38. NO = False
  39. YES = True
  40. AUTO = None
  41. def __bool__(self):
  42. return bool(self.value)
  43. def _encode_chunk(chunk, charset):
  44. if isinstance(chunk, str):
  45. chunk = chunk.encode(charset)
  46. return "{:x}".format(len(chunk)).encode(charset) + b"\r\n" + chunk + b"\r\n"
  47. class ContentServer(WSGIServer):
  48. """
  49. Small test server which can be taught which content (i.e. string) to serve
  50. with which response code. Try the following snippet for testing API calls::
  51. server = ContentServer(port=8080)
  52. server.start()
  53. print 'Test server running at http://%s:%i' % server.server_address
  54. # any request to http://localhost:8080 will get a 503 response.
  55. server.content = 'Hello World!'
  56. server.code = 503
  57. # ...
  58. # we're done
  59. server.stop()
  60. """
  61. def __init__(self, host="127.0.0.1", port=0, ssl_context=None):
  62. super().__init__(host, port, self, ssl_context=ssl_context)
  63. self.content, self.code = ("", 204) # HTTP 204: No Content
  64. self.headers = {}
  65. self.show_post_vars = False
  66. self.compress = None
  67. self.requests = []
  68. self.chunked = Chunked.NO
  69. self.store_request_data = False
  70. def __call__(self, environ, start_response):
  71. """
  72. This is the WSGI application.
  73. """
  74. request = Request(environ)
  75. if self.store_request_data:
  76. # need to invoke this method to cache the data
  77. request.get_data(cache=True)
  78. self.requests.append(request)
  79. if (
  80. request.content_type == "application/x-www-form-urlencoded"
  81. and request.method == "POST"
  82. and self.show_post_vars
  83. ):
  84. content = json.dumps(request.form)
  85. else:
  86. content = self.content
  87. if self.chunked == Chunked.YES or (
  88. self.chunked == Chunked.AUTO and "chunked" in self.headers.get("Transfer-encoding", "")
  89. ):
  90. # If the code below ever changes to allow setting the charset of
  91. # the Response object, the charset used here should also be changed
  92. # to match. But until that happens, use UTF-8 since it is Werkzeug's
  93. # default.
  94. charset = "utf-8"
  95. if isinstance(content, (str, bytes)):
  96. content = (_encode_chunk(content, charset), "0\r\n\r\n")
  97. else:
  98. content = itertools.chain((_encode_chunk(item, charset) for item in content), ["0\r\n\r\n"])
  99. response = Response(response=content, status=self.code)
  100. response.headers.clear()
  101. response.headers.extend(self.headers)
  102. # FIXME get compression working!
  103. # if self.compress == 'gzip':
  104. # content = gzip.compress(content.encode('utf-8'))
  105. # response.content_encoding = 'gzip'
  106. return response(environ, start_response)
  107. def serve_content(self, content, code=200, headers=None, chunked=Chunked.NO, store_request_data=True):
  108. """
  109. Serves string content (with specified HTTP error code) as response to
  110. all subsequent request.
  111. :param content: content to be displayed
  112. :param code: HTTP status code
  113. :param headers: HTTP headers to be returned
  114. :param chunked: whether to apply chunked transfer encoding to the content
  115. :param store_request_data: whether to store data sent as request payload.
  116. """
  117. if not isinstance(content, (str, bytes, list, tuple)):
  118. # If content is an iterable which is not known to be a string,
  119. # bytes, or sequence, it might be something that can only be iterated
  120. # through once, in which case we need to cache it so it can be reused
  121. # to handle multiple requests.
  122. try:
  123. content = tuple(iter(content))
  124. except TypeError:
  125. # this probably means that content is not iterable, so just go
  126. # ahead in case it's some type that Response knows how to handle
  127. pass
  128. self.content = content
  129. self.code = code
  130. self.chunked = chunked
  131. self.store_request_data = store_request_data
  132. if headers:
  133. self.headers = Headers(headers)
  134. if __name__ == "__main__": # pragma: no cover
  135. import os.path
  136. import time
  137. app = ContentServer()
  138. server = WSGIServer(application=app)
  139. server.start()
  140. print("HTTP server is running at %s" % server.url)
  141. print("Type <Ctrl-C> to stop")
  142. try:
  143. path = sys.argv[1]
  144. except IndexError:
  145. path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "README.rst")
  146. app.serve_content(open(path).read(), 302)
  147. try:
  148. while True:
  149. time.sleep(1)
  150. except KeyboardInterrupt:
  151. print("\rstopping...")
  152. server.stop()