http.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. # Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
  2. #
  3. # This program is release under the MIT license. You can find the full text of
  4. # the license in the LICENSE file.
  5. import enum
  6. import itertools
  7. import json
  8. import sys
  9. import threading
  10. from werkzeug.datastructures import Headers
  11. from werkzeug.serving import make_server
  12. from werkzeug.wrappers import Request
  13. from werkzeug.wrappers import Response
  14. class WSGIServer(threading.Thread):
  15. """
  16. HTTP server running a WSGI application in its own thread.
  17. """
  18. def __init__(self, host="127.0.0.1", port=0, application=None, **kwargs):
  19. self.app = application
  20. self._server = make_server(host, port, self.app, **kwargs)
  21. self.server_address = self._server.server_address
  22. super().__init__(name=self.__class__, target=self._server.serve_forever)
  23. def __del__(self):
  24. self.stop()
  25. def stop(self):
  26. try:
  27. server = self._server
  28. except AttributeError:
  29. pass
  30. else:
  31. server.shutdown()
  32. @property
  33. def url(self):
  34. host, port = self.server_address
  35. proto = "http" if self._server.ssl_context is None else "https"
  36. return "%s://%s:%i" % (proto, host, port)
  37. class Chunked(enum.Enum):
  38. NO = False
  39. YES = True
  40. AUTO = None
  41. def __bool__(self):
  42. return bool(self.value)
  43. def _encode_chunk(chunk, charset):
  44. if isinstance(chunk, str):
  45. chunk = chunk.encode(charset)
  46. return "{:x}".format(len(chunk)).encode(charset) + b"\r\n" + chunk + b"\r\n"
  47. class ContentServer(WSGIServer):
  48. """
  49. Small test server which can be taught which content (i.e. string) to serve
  50. with which response code. Try the following snippet for testing API calls::
  51. server = ContentServer(port=8080)
  52. server.start()
  53. print 'Test server running at http://%s:%i' % server.server_address
  54. # any request to http://localhost:8080 will get a 503 response.
  55. server.content = 'Hello World!'
  56. server.code = 503
  57. # ...
  58. # we're done
  59. server.stop()
  60. """
  61. def __init__(self, host="127.0.0.1", port=0, ssl_context=None):
  62. super().__init__(host, port, self, ssl_context=ssl_context)
  63. self.content, self.code = ("", 204) # HTTP 204: No Content
  64. self.headers = {}
  65. self.show_post_vars = False
  66. self.compress = None
  67. self.requests = []
  68. self.chunked = Chunked.NO
  69. def __call__(self, environ, start_response):
  70. """
  71. This is the WSGI application.
  72. """
  73. request = Request(environ)
  74. self.requests.append(request)
  75. if (
  76. request.content_type == "application/x-www-form-urlencoded"
  77. and request.method == "POST"
  78. and self.show_post_vars
  79. ):
  80. content = json.dumps(request.form)
  81. else:
  82. content = self.content
  83. if self.chunked == Chunked.YES or (
  84. self.chunked == Chunked.AUTO and "chunked" in self.headers.get("Transfer-encoding", "")
  85. ):
  86. # If the code below ever changes to allow setting the charset of
  87. # the Response object, the charset used here should also be changed
  88. # to match. But until that happens, use UTF-8 since it is Werkzeug's
  89. # default.
  90. charset = "utf-8"
  91. if isinstance(content, (str, bytes)):
  92. content = (_encode_chunk(content, charset), "0\r\n\r\n")
  93. else:
  94. content = itertools.chain((_encode_chunk(item, charset) for item in content), ["0\r\n\r\n"])
  95. response = Response(response=content, status=self.code)
  96. response.headers.clear()
  97. response.headers.extend(self.headers)
  98. # FIXME get compression working!
  99. # if self.compress == 'gzip':
  100. # content = gzip.compress(content.encode('utf-8'))
  101. # response.content_encoding = 'gzip'
  102. return response(environ, start_response)
  103. def serve_content(self, content, code=200, headers=None, chunked=Chunked.NO):
  104. """
  105. Serves string content (with specified HTTP error code) as response to
  106. all subsequent request.
  107. :param content: content to be displayed
  108. :param code: HTTP status code
  109. :param headers: HTTP headers to be returned
  110. :param chunked: whether to apply chunked transfer encoding to the content
  111. """
  112. if not isinstance(content, (str, bytes, list, tuple)):
  113. # If content is an iterable which is not known to be a string,
  114. # bytes, or sequence, it might be something that can only be iterated
  115. # through once, in which case we need to cache it so it can be reused
  116. # to handle multiple requests.
  117. try:
  118. content = tuple(iter(content))
  119. except TypeError:
  120. # this probably means that content is not iterable, so just go
  121. # ahead in case it's some type that Response knows how to handle
  122. pass
  123. self.content = content
  124. self.code = code
  125. self.chunked = chunked
  126. if headers:
  127. self.headers = Headers(headers)
  128. if __name__ == "__main__": # pragma: no cover
  129. import os.path
  130. import time
  131. app = ContentServer()
  132. server = WSGIServer(application=app)
  133. server.start()
  134. print("HTTP server is running at %s" % server.url)
  135. print("Type <Ctrl-C> to stop")
  136. try:
  137. path = sys.argv[1]
  138. except IndexError:
  139. path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "README.rst")
  140. app.serve_content(open(path).read(), 302)
  141. try:
  142. while True:
  143. time.sleep(1)
  144. except KeyboardInterrupt:
  145. print("\rstopping...")
  146. server.stop()