123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- # Copyright (C) 2010-2013 Sebastian Rahlf and others (see AUTHORS).
- #
- # This program is release under the MIT license. You can find the full text of
- # the license in the LICENSE file.
- import enum
- import itertools
- import json
- import sys
- import threading
- from werkzeug.datastructures import Headers
- from werkzeug.serving import make_server
- from werkzeug.wrappers import Request
- from werkzeug.wrappers import Response
- class WSGIServer(threading.Thread):
- """
- HTTP server running a WSGI application in its own thread.
- """
- def __init__(self, host="127.0.0.1", port=0, application=None, **kwargs):
- self.app = application
- self._server = make_server(host, port, self.app, **kwargs)
- self.server_address = self._server.server_address
- super().__init__(name=self.__class__, target=self._server.serve_forever)
- def __del__(self):
- self.stop()
- def stop(self):
- try:
- server = self._server
- except AttributeError:
- pass
- else:
- server.shutdown()
- @property
- def url(self):
- host, port = self.server_address
- proto = "http" if self._server.ssl_context is None else "https"
- return "%s://%s:%i" % (proto, host, port)
- class Chunked(enum.Enum):
- NO = False
- YES = True
- AUTO = None
- def __bool__(self):
- return bool(self.value)
- def _encode_chunk(chunk, charset):
- if isinstance(chunk, str):
- chunk = chunk.encode(charset)
- return "{:x}".format(len(chunk)).encode(charset) + b"\r\n" + chunk + b"\r\n"
- class ContentServer(WSGIServer):
- """
- Small test server which can be taught which content (i.e. string) to serve
- with which response code. Try the following snippet for testing API calls::
- server = ContentServer(port=8080)
- server.start()
- print 'Test server running at http://%s:%i' % server.server_address
- # any request to http://localhost:8080 will get a 503 response.
- server.content = 'Hello World!'
- server.code = 503
- # ...
- # we're done
- server.stop()
- """
- def __init__(self, host="127.0.0.1", port=0, ssl_context=None):
- super().__init__(host, port, self, ssl_context=ssl_context)
- self.content, self.code = ("", 204) # HTTP 204: No Content
- self.headers = {}
- self.show_post_vars = False
- self.compress = None
- self.requests = []
- self.chunked = Chunked.NO
- self.store_request_data = False
- def __call__(self, environ, start_response):
- """
- This is the WSGI application.
- """
- request = Request(environ)
- if self.store_request_data:
- # need to invoke this method to cache the data
- request.get_data(cache=True)
- self.requests.append(request)
- if (
- request.content_type == "application/x-www-form-urlencoded"
- and request.method == "POST"
- and self.show_post_vars
- ):
- content = json.dumps(request.form)
- else:
- content = self.content
- if self.chunked == Chunked.YES or (
- self.chunked == Chunked.AUTO and "chunked" in self.headers.get("Transfer-encoding", "")
- ):
- # If the code below ever changes to allow setting the charset of
- # the Response object, the charset used here should also be changed
- # to match. But until that happens, use UTF-8 since it is Werkzeug's
- # default.
- charset = "utf-8"
- if isinstance(content, (str, bytes)):
- content = (_encode_chunk(content, charset), "0\r\n\r\n")
- else:
- content = itertools.chain((_encode_chunk(item, charset) for item in content), ["0\r\n\r\n"])
- response = Response(response=content, status=self.code)
- response.headers.clear()
- response.headers.extend(self.headers)
- # FIXME get compression working!
- # if self.compress == 'gzip':
- # content = gzip.compress(content.encode('utf-8'))
- # response.content_encoding = 'gzip'
- return response(environ, start_response)
- def serve_content(self, content, code=200, headers=None, chunked=Chunked.NO, store_request_data=True):
- """
- Serves string content (with specified HTTP error code) as response to
- all subsequent request.
- :param content: content to be displayed
- :param code: HTTP status code
- :param headers: HTTP headers to be returned
- :param chunked: whether to apply chunked transfer encoding to the content
- :param store_request_data: whether to store data sent as request payload.
- """
- if not isinstance(content, (str, bytes, list, tuple)):
- # If content is an iterable which is not known to be a string,
- # bytes, or sequence, it might be something that can only be iterated
- # through once, in which case we need to cache it so it can be reused
- # to handle multiple requests.
- try:
- content = tuple(iter(content))
- except TypeError:
- # this probably means that content is not iterable, so just go
- # ahead in case it's some type that Response knows how to handle
- pass
- self.content = content
- self.code = code
- self.chunked = chunked
- self.store_request_data = store_request_data
- if headers:
- self.headers = Headers(headers)
- if __name__ == "__main__": # pragma: no cover
- import os.path
- import time
- app = ContentServer()
- server = WSGIServer(application=app)
- server.start()
- print("HTTP server is running at %s" % server.url)
- print("Type <Ctrl-C> to stop")
- try:
- path = sys.argv[1]
- except IndexError:
- path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "README.rst")
- app.serve_content(open(path).read(), 302)
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- print("\rstopping...")
- server.stop()
|