AuthorizationRequestHandler.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # Copyright (c) 2019 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. from http.server import BaseHTTPRequestHandler
  4. from threading import Lock # To turn an asynchronous call synchronous.
  5. from typing import Optional, Callable, Tuple, Dict, Any, List, TYPE_CHECKING
  6. from urllib.parse import parse_qs, urlparse
  7. from UM.Logger import Logger
  8. from cura.OAuth2.Models import AuthenticationResponse, ResponseData, HTTP_STATUS
  9. from UM.i18n import i18nCatalog
  10. if TYPE_CHECKING:
  11. from cura.OAuth2.Models import ResponseStatus
  12. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
  13. catalog = i18nCatalog("cura")
  14. class AuthorizationRequestHandler(BaseHTTPRequestHandler):
  15. """This handler handles all HTTP requests on the local web server.
  16. It also requests the access token for the 2nd stage of the OAuth flow.
  17. """
  18. def __init__(self, request, client_address, server) -> None:
  19. super().__init__(request, client_address, server)
  20. # These values will be injected by the HTTPServer that this handler belongs to.
  21. self.authorization_helpers: Optional[AuthorizationHelpers] = None
  22. self.authorization_callback: Optional[Callable[[AuthenticationResponse], None]] = None
  23. self.verification_code: Optional[str] = None
  24. self.state: Optional[str] = None
  25. # CURA-6609: Some browser seems to issue a HEAD instead of GET request as the callback.
  26. def do_HEAD(self) -> None:
  27. self.do_GET()
  28. def do_GET(self) -> None:
  29. # Extract values from the query string.
  30. parsed_url = urlparse(self.path)
  31. query = parse_qs(parsed_url.query)
  32. # Handle the possible requests
  33. if parsed_url.path == "/callback":
  34. server_response, token_response = self._handleCallback(query)
  35. else:
  36. server_response = self._handleNotFound()
  37. token_response = None
  38. # Send the data to the browser.
  39. self._sendHeaders(server_response.status, server_response.content_type, server_response.redirect_uri)
  40. if server_response.data_stream:
  41. # If there is data in the response, we send it.
  42. self._sendData(server_response.data_stream)
  43. if token_response and self.authorization_callback is not None:
  44. # Trigger the callback if we got a response.
  45. # This will cause the server to shut down, so we do it at the very end of the request handling.
  46. self.authorization_callback(token_response)
  47. def _handleCallback(self, query: Dict[Any, List]) -> Tuple[ResponseData, Optional[AuthenticationResponse]]:
  48. """Handler for the callback URL redirect.
  49. :param query: Dict containing the HTTP query parameters.
  50. :return: HTTP ResponseData containing a success page to show to the user.
  51. """
  52. code = self._queryGet(query, "code")
  53. state = self._queryGet(query, "state")
  54. if state != self.state:
  55. Logger.log("w", f"The provided state was not correct. Got {state} and expected {self.state}")
  56. token_response = AuthenticationResponse(
  57. success = False,
  58. err_message = catalog.i18nc("@message", "The provided state is not correct.")
  59. )
  60. elif code and self.authorization_helpers is not None and self.verification_code is not None:
  61. Logger.log("d", "Timeout when authenticating with the account server.")
  62. token_response = AuthenticationResponse(
  63. success = False,
  64. err_message = catalog.i18nc("@message", "Timeout when authenticating with the account server.")
  65. )
  66. # If the code was returned we get the access token.
  67. lock = Lock()
  68. lock.acquire()
  69. def callback(response: AuthenticationResponse) -> None:
  70. nonlocal token_response
  71. token_response = response
  72. lock.release()
  73. self.authorization_helpers.getAccessTokenUsingAuthorizationCode(code, self.verification_code, callback)
  74. lock.acquire(timeout = 60) # Block thread until request is completed (which releases the lock). If not acquired, the timeout message stays.
  75. elif self._queryGet(query, "error_code") == "user_denied":
  76. # Otherwise we show an error message (probably the user clicked "Deny" in the auth dialog).
  77. Logger.log("d", "User did not give the required permission when authorizing this application")
  78. token_response = AuthenticationResponse(
  79. success = False,
  80. err_message = catalog.i18nc("@message", "Please give the required permissions when authorizing this application.")
  81. )
  82. else:
  83. # We don't know what went wrong here, so instruct the user to check the logs.
  84. Logger.log("w", f"Unexpected error when logging in. Error_code: {self._queryGet(query, 'error_code')}, State: {state}")
  85. token_response = AuthenticationResponse(
  86. success = False,
  87. error_message = catalog.i18nc("@message", "Something unexpected happened when trying to log in, please try again.")
  88. )
  89. if self.authorization_helpers is None:
  90. return ResponseData(), token_response
  91. return ResponseData(
  92. status = HTTP_STATUS["REDIRECT"],
  93. data_stream = b"Redirecting...",
  94. redirect_uri = self.authorization_helpers.settings.AUTH_SUCCESS_REDIRECT if token_response.success else
  95. self.authorization_helpers.settings.AUTH_FAILED_REDIRECT
  96. ), token_response
  97. @staticmethod
  98. def _handleNotFound() -> ResponseData:
  99. """Handle all other non-existing server calls."""
  100. return ResponseData(status = HTTP_STATUS["NOT_FOUND"], content_type = "text/html", data_stream = b"Not found.")
  101. def _sendHeaders(self, status: "ResponseStatus", content_type: str, redirect_uri: str = None) -> None:
  102. self.send_response(status.code, status.message)
  103. self.send_header("Content-type", content_type)
  104. if redirect_uri:
  105. self.send_header("Location", redirect_uri)
  106. self.end_headers()
  107. def _sendData(self, data: bytes) -> None:
  108. self.wfile.write(data)
  109. @staticmethod
  110. def _queryGet(query_data: Dict[Any, List], key: str, default: Optional[str] = None) -> Optional[str]:
  111. """Convenience helper for getting values from a pre-parsed query string"""
  112. return query_data.get(key, [default])[0]