http.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
  2. #
  3. # This program is release under the MIT license. You can find the full text of
  4. # the license in the LICENSE file.
  5. import json
  6. import sys
  7. import threading
  8. from werkzeug.serving import make_server
  9. from werkzeug.wrappers import Response, Request
  10. class WSGIServer(threading.Thread):
  11. """
  12. HTTP server running a WSGI application in its own thread.
  13. """
  14. def __init__(self, host='127.0.0.1', port=0, application=None, **kwargs):
  15. self.app = application
  16. self._server = make_server(host, port, self.app, **kwargs)
  17. self.server_address = self._server.server_address
  18. super(WSGIServer, self).__init__(
  19. name=self.__class__,
  20. target=self._server.serve_forever)
  21. def __del__(self):
  22. self.stop()
  23. def stop(self):
  24. self._server.shutdown()
  25. @property
  26. def url(self):
  27. host, port = self.server_address
  28. proto = 'http' if self._server.ssl_context is None else 'https'
  29. return '%s://%s:%i' % (proto, host, port)
  30. class ContentServer(WSGIServer):
  31. """
  32. Small test server which can be taught which content (i.e. string) to serve
  33. with which response code. Try the following snippet for testing API calls::
  34. server = ContentServer(port=8080)
  35. server.start()
  36. print 'Test server running at http://%s:%i' % server.server_address
  37. # any request to http://localhost:8080 will get a 503 response.
  38. server.content = 'Hello World!'
  39. server.code = 503
  40. # ...
  41. # we're done
  42. server.stop()
  43. """
  44. def __init__(self, host='127.0.0.1', port=0, ssl_context=None):
  45. super(ContentServer, self).__init__(host, port, self, ssl_context=ssl_context)
  46. self.content, self.code = ('', 204) # HTTP 204: No Content
  47. self.headers = {}
  48. self.show_post_vars = False
  49. self.compress = None
  50. self.requests = []
  51. def __call__(self, environ, start_response):
  52. """
  53. This is the WSGI application.
  54. """
  55. request = Request(environ)
  56. self.requests.append(request)
  57. if (request.content_type == 'application/x-www-form-urlencoded'
  58. and request.method == 'POST' and self.show_post_vars):
  59. content = json.dumps(request.form)
  60. else:
  61. content = self.content
  62. response = Response(status=self.code)
  63. response.headers.clear()
  64. response.headers.extend(self.headers)
  65. # FIXME get compression working!
  66. # if self.compress == 'gzip':
  67. # content = gzip.compress(content.encode('utf-8'))
  68. # response.content_encoding = 'gzip'
  69. response.data = content
  70. return response(environ, start_response)
  71. def serve_content(self, content, code=200, headers=None):
  72. """
  73. Serves string content (with specified HTTP error code) as response to
  74. all subsequent request.
  75. :param content: content to be displayed
  76. :param code: HTTP status code
  77. :param headers: HTTP headers to be returned
  78. """
  79. self.content, self.code = (content, code)
  80. if headers:
  81. self.headers = headers
  82. if __name__ == '__main__': # pragma: no cover
  83. import os.path
  84. import time
  85. app = ContentServer()
  86. server = WSGIServer(application=app)
  87. server.start()
  88. print('HTTP server is running at %s' % server.url)
  89. print('Type <Ctrl-C> to stop')
  90. try:
  91. path = sys.argv[1]
  92. except IndexError:
  93. path = os.path.join(
  94. os.path.dirname(os.path.abspath(__file__)), '..', 'README.rst')
  95. app.serve_content(open(path).read(), 302)
  96. try:
  97. while True:
  98. time.sleep(1)
  99. except KeyboardInterrupt:
  100. print('\rstopping...')
  101. server.stop()