AuthorizationHelpers.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import json
  4. import random
  5. from _sha512 import sha512
  6. from base64 import b64encode
  7. from typing import Optional
  8. import requests
  9. from UM.Logger import Logger
  10. from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
  11. class AuthorizationHelpers:
  12. """Class containing several helpers to deal with the authorization flow."""
  13. def __init__(self, settings: "OAuth2Settings"):
  14. self._settings = settings
  15. self._token_url = "{}/token".format(self._settings.OAUTH_SERVER_URL)
  16. @property
  17. def settings(self) -> "OAuth2Settings":
  18. """Get the OAuth2 settings object."""
  19. return self._settings
  20. def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str)->\
  21. Optional["AuthenticationResponse"]:
  22. """
  23. Request the access token from the authorization server.
  24. :param authorization_code: The authorization code from the 1st step.
  25. :param verification_code: The verification code needed for the PKCE extension.
  26. :return: An AuthenticationResponse object.
  27. """
  28. return self.parseTokenResponse(requests.post(self._token_url, data={
  29. "client_id": self._settings.CLIENT_ID,
  30. "redirect_uri": self._settings.CALLBACK_URL,
  31. "grant_type": "authorization_code",
  32. "code": authorization_code,
  33. "code_verifier": verification_code,
  34. "scope": self._settings.CLIENT_SCOPES
  35. }))
  36. def getAccessTokenUsingRefreshToken(self, refresh_token: str) -> Optional["AuthenticationResponse"]:
  37. """
  38. Request the access token from the authorization server using a refresh token.
  39. :param refresh_token:
  40. :return: An AuthenticationResponse object.
  41. """
  42. return self.parseTokenResponse(requests.post(self._token_url, data={
  43. "client_id": self._settings.CLIENT_ID,
  44. "redirect_uri": self._settings.CALLBACK_URL,
  45. "grant_type": "refresh_token",
  46. "refresh_token": refresh_token,
  47. "scope": self._settings.CLIENT_SCOPES
  48. }))
  49. @staticmethod
  50. def parseTokenResponse(token_response: "requests.request") -> Optional["AuthenticationResponse"]:
  51. """
  52. Parse the token response from the authorization server into an AuthenticationResponse object.
  53. :param token_response: The JSON string data response from the authorization server.
  54. :return: An AuthenticationResponse object.
  55. """
  56. token_data = None
  57. try:
  58. token_data = json.loads(token_response.text)
  59. except ValueError:
  60. Logger.log("w", "Could not parse token response data: %s", token_response.text)
  61. if not token_data:
  62. return AuthenticationResponse(success=False, err_message="Could not read response.")
  63. if token_response.status_code not in (200, 201):
  64. return AuthenticationResponse(success=False, err_message=token_data["error_description"])
  65. return AuthenticationResponse(success=True,
  66. token_type=token_data["token_type"],
  67. access_token=token_data["access_token"],
  68. refresh_token=token_data["refresh_token"],
  69. expires_in=token_data["expires_in"],
  70. scope=token_data["scope"])
  71. def parseJWT(self, access_token: str) -> Optional["UserProfile"]:
  72. """
  73. Calls the authentication API endpoint to get the token data.
  74. :param access_token: The encoded JWT token.
  75. :return: Dict containing some profile data.
  76. """
  77. token_request = requests.get("{}/check-token".format(self._settings.OAUTH_SERVER_URL), headers = {
  78. "Authorization": "Bearer {}".format(access_token)
  79. })
  80. if token_request.status_code not in (200, 201):
  81. Logger.log("w", "Could not retrieve token data from auth server: %s", token_request.text)
  82. return None
  83. user_data = token_request.json().get("data")
  84. if not user_data or not isinstance(user_data, dict):
  85. Logger.log("w", "Could not parse user data from token: %s", user_data)
  86. return None
  87. return UserProfile(
  88. user_id = user_data["user_id"],
  89. username = user_data["username"],
  90. profile_image_url = user_data.get("profile_image_url", "")
  91. )
  92. @staticmethod
  93. def generateVerificationCode(code_length: int = 16) -> str:
  94. """
  95. Generate a 16-character verification code.
  96. :param code_length:
  97. :return:
  98. """
  99. return "".join(random.choice("0123456789ABCDEF") for i in range(code_length))
  100. @staticmethod
  101. def generateVerificationCodeChallenge(verification_code: str) -> str:
  102. """
  103. Generates a base64 encoded sha512 encrypted version of a given string.
  104. :param verification_code:
  105. :return: The encrypted code in base64 format.
  106. """
  107. encoded = sha512(verification_code.encode()).digest()
  108. return b64encode(encoded, altchars = b"_-").decode()