AuthorizationHelpers.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # Copyright (c) 2019 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import json
  4. import random
  5. from hashlib import sha512
  6. from base64 import b64encode
  7. from typing import Dict, Optional
  8. import requests
  9. from UM.Logger import Logger
  10. from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
  11. ## Class containing several helpers to deal with the authorization flow.
  12. class AuthorizationHelpers:
  13. def __init__(self, settings: "OAuth2Settings") -> None:
  14. self._settings = settings
  15. self._token_url = "{}/token".format(self._settings.OAUTH_SERVER_URL)
  16. @property
  17. ## The OAuth2 settings object.
  18. def settings(self) -> "OAuth2Settings":
  19. return self._settings
  20. ## Request the access token from the authorization server.
  21. # \param authorization_code: The authorization code from the 1st step.
  22. # \param verification_code: The verification code needed for the PKCE
  23. # extension.
  24. # \return An AuthenticationResponse object.
  25. def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str) -> "AuthenticationResponse":
  26. data = {
  27. "client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
  28. "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
  29. "grant_type": "authorization_code",
  30. "code": authorization_code,
  31. "code_verifier": verification_code,
  32. "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
  33. }
  34. return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
  35. ## Request the access token from the authorization server using a refresh token.
  36. # \param refresh_token:
  37. # \return An AuthenticationResponse object.
  38. def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> "AuthenticationResponse":
  39. data = {
  40. "client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
  41. "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
  42. "grant_type": "refresh_token",
  43. "refresh_token": refresh_token,
  44. "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
  45. }
  46. return self.parseTokenResponse(requests.post(self._token_url, data = data)) # type: ignore
  47. @staticmethod
  48. ## Parse the token response from the authorization server into an AuthenticationResponse object.
  49. # \param token_response: The JSON string data response from the authorization server.
  50. # \return An AuthenticationResponse object.
  51. def parseTokenResponse(token_response: requests.models.Response) -> "AuthenticationResponse":
  52. token_data = None
  53. try:
  54. token_data = json.loads(token_response.text)
  55. except ValueError:
  56. Logger.log("w", "Could not parse token response data: %s", token_response.text)
  57. if not token_data:
  58. return AuthenticationResponse(success = False, err_message = "Could not read response.")
  59. if token_response.status_code not in (200, 201):
  60. return AuthenticationResponse(success = False, err_message = token_data["error_description"])
  61. return AuthenticationResponse(success = True,
  62. token_type = token_data["token_type"],
  63. access_token = token_data["access_token"],
  64. refresh_token = token_data["refresh_token"],
  65. expires_in = token_data["expires_in"],
  66. scope = token_data["scope"])
  67. ## Calls the authentication API endpoint to get the token data.
  68. # \param access_token: The encoded JWT token.
  69. # \return Dict containing some profile data.
  70. def parseJWT(self, access_token: str) -> Optional["UserProfile"]:
  71. try:
  72. token_request = requests.get("{}/check-token".format(self._settings.OAUTH_SERVER_URL), headers = {
  73. "Authorization": "Bearer {}".format(access_token)
  74. })
  75. except ConnectionError:
  76. # Connection was suddenly dropped. Nothing we can do about that.
  77. Logger.logException("e", "Something failed while attempting to parse the JWT token")
  78. return None
  79. if token_request.status_code not in (200, 201):
  80. Logger.log("w", "Could not retrieve token data from auth server: %s", token_request.text)
  81. return None
  82. user_data = token_request.json().get("data")
  83. if not user_data or not isinstance(user_data, dict):
  84. Logger.log("w", "Could not parse user data from token: %s", user_data)
  85. return None
  86. return UserProfile(
  87. user_id = user_data["user_id"],
  88. username = user_data["username"],
  89. profile_image_url = user_data.get("profile_image_url", "")
  90. )
  91. @staticmethod
  92. ## Generate a 16-character verification code.
  93. # \param code_length: How long should the code be?
  94. def generateVerificationCode(code_length: int = 16) -> str:
  95. return "".join(random.choice("0123456789ABCDEF") for i in range(code_length))
  96. @staticmethod
  97. ## Generates a base64 encoded sha512 encrypted version of a given string.
  98. # \param verification_code:
  99. # \return The encrypted code in base64 format.
  100. def generateVerificationCodeChallenge(verification_code: str) -> str:
  101. encoded = sha512(verification_code.encode()).digest()
  102. return b64encode(encoded, altchars = b"_-").decode()