AuthorizationRequestHandler.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. from typing import Optional, Callable, Tuple, Dict, Any, List, TYPE_CHECKING
  4. from http.server import BaseHTTPRequestHandler
  5. from urllib.parse import parse_qs, urlparse
  6. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
  7. from cura.OAuth2.Models import AuthenticationResponse, ResponseData, HTTP_STATUS
  8. if TYPE_CHECKING:
  9. from cura.OAuth2.Models import ResponseStatus
  10. class AuthorizationRequestHandler(BaseHTTPRequestHandler):
  11. """
  12. This handler handles all HTTP requests on the local web server.
  13. It also requests the access token for the 2nd stage of the OAuth flow.
  14. """
  15. def __init__(self, request, client_address, server) -> None:
  16. super().__init__(request, client_address, server)
  17. # These values will be injected by the HTTPServer that this handler belongs to.
  18. self.authorization_helpers = None # type: Optional[AuthorizationHelpers]
  19. self.authorization_callback = None # type: Optional[Callable[[AuthenticationResponse], None]]
  20. self.verification_code = None # type: Optional[str]
  21. def do_GET(self) -> None:
  22. """Entry point for GET requests"""
  23. # Extract values from the query string.
  24. parsed_url = urlparse(self.path)
  25. query = parse_qs(parsed_url.query)
  26. # Handle the possible requests
  27. if parsed_url.path == "/callback":
  28. server_response, token_response = self._handleCallback(query)
  29. else:
  30. server_response = self._handleNotFound()
  31. token_response = None
  32. # Send the data to the browser.
  33. self._sendHeaders(server_response.status, server_response.content_type, server_response.redirect_uri)
  34. if server_response.data_stream:
  35. # If there is data in the response, we send it.
  36. self._sendData(server_response.data_stream)
  37. if token_response and self.authorization_callback is not None:
  38. # Trigger the callback if we got a response.
  39. # This will cause the server to shut down, so we do it at the very end of the request handling.
  40. self.authorization_callback(token_response)
  41. def _handleCallback(self, query: Dict[Any, List]) -> Tuple["ResponseData", Optional["AuthenticationResponse"]]:
  42. """
  43. Handler for the callback URL redirect.
  44. :param query: Dict containing the HTTP query parameters.
  45. :return: HTTP ResponseData containing a success page to show to the user.
  46. """
  47. code = self._queryGet(query, "code")
  48. if code and self.authorization_helpers is not None and self.verification_code is not None:
  49. # If the code was returned we get the access token.
  50. token_response = self.authorization_helpers.getAccessTokenUsingAuthorizationCode(
  51. code, self.verification_code)
  52. elif self._queryGet(query, "error_code") == "user_denied":
  53. # Otherwise we show an error message (probably the user clicked "Deny" in the auth dialog).
  54. token_response = AuthenticationResponse(
  55. success=False,
  56. err_message="Please give the required permissions when authorizing this application."
  57. )
  58. else:
  59. # We don't know what went wrong here, so instruct the user to check the logs.
  60. token_response = AuthenticationResponse(
  61. success=False,
  62. error_message="Something unexpected happened when trying to log in, please try again."
  63. )
  64. if self.authorization_helpers is None:
  65. return ResponseData(), token_response
  66. return ResponseData(
  67. status=HTTP_STATUS["REDIRECT"],
  68. data_stream=b"Redirecting...",
  69. redirect_uri=self.authorization_helpers.settings.AUTH_SUCCESS_REDIRECT if token_response.success else
  70. self.authorization_helpers.settings.AUTH_FAILED_REDIRECT
  71. ), token_response
  72. @staticmethod
  73. def _handleNotFound() -> ResponseData:
  74. """Handle all other non-existing server calls."""
  75. return ResponseData(status=HTTP_STATUS["NOT_FOUND"], content_type="text/html", data_stream=b"Not found.")
  76. def _sendHeaders(self, status: "ResponseStatus", content_type: str, redirect_uri: str = None) -> None:
  77. """Send out the headers"""
  78. self.send_response(status.code, status.message)
  79. self.send_header("Content-type", content_type)
  80. if redirect_uri:
  81. self.send_header("Location", redirect_uri)
  82. self.end_headers()
  83. def _sendData(self, data: bytes) -> None:
  84. """Send out the data"""
  85. self.wfile.write(data)
  86. @staticmethod
  87. def _queryGet(query_data: Dict[Any, List], key: str, default: Optional[str]=None) -> Optional[str]:
  88. """Helper for getting values from a pre-parsed query string"""
  89. return query_data.get(key, [default])[0]