AuthorizationService.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import json
  4. import webbrowser
  5. from typing import Optional, TYPE_CHECKING
  6. from urllib.parse import urlencode
  7. import requests.exceptions
  8. from UM.Logger import Logger
  9. from UM.Signal import Signal
  10. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  11. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
  12. from cura.OAuth2.Models import AuthenticationResponse
  13. if TYPE_CHECKING:
  14. from cura.OAuth2.Models import UserProfile, OAuth2Settings
  15. from UM.Preferences import Preferences
  16. class AuthorizationService:
  17. """
  18. The authorization service is responsible for handling the login flow,
  19. storing user credentials and providing account information.
  20. """
  21. # Emit signal when authentication is completed.
  22. onAuthStateChanged = Signal()
  23. # Emit signal when authentication failed.
  24. onAuthenticationError = Signal()
  25. def __init__(self, settings: "OAuth2Settings", preferences: Optional["Preferences"] = None) -> None:
  26. self._settings = settings
  27. self._auth_helpers = AuthorizationHelpers(settings)
  28. self._auth_url = "{}/authorize".format(self._settings.OAUTH_SERVER_URL)
  29. self._auth_data = None # type: Optional[AuthenticationResponse]
  30. self._user_profile = None # type: Optional["UserProfile"]
  31. self._preferences = preferences
  32. self._server = LocalAuthorizationServer(self._auth_helpers, self._onAuthStateChanged, daemon=True)
  33. def initialize(self, preferences: Optional["Preferences"] = None) -> None:
  34. if preferences is not None:
  35. self._preferences = preferences
  36. if self._preferences:
  37. self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
  38. # Get the user profile as obtained from the JWT (JSON Web Token).
  39. # If the JWT is not yet parsed, calling this will take care of that.
  40. # \return UserProfile if a user is logged in, None otherwise.
  41. # \sa _parseJWT
  42. def getUserProfile(self) -> Optional["UserProfile"]:
  43. try:
  44. self._user_profile = self._parseJWT()
  45. except requests.exceptions.ConnectionError:
  46. # Unable to get connection, can't login.
  47. return None
  48. if not self._user_profile and self._auth_data:
  49. # If there is still no user profile from the JWT, we have to log in again.
  50. Logger.log("w", "The user profile could not be loaded. The user must log in again!")
  51. self.deleteAuthData()
  52. return None
  53. return self._user_profile
  54. # Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
  55. # \return UserProfile if it was able to parse, None otherwise.
  56. def _parseJWT(self) -> Optional["UserProfile"]:
  57. if not self._auth_data or self._auth_data.access_token is None:
  58. # If no auth data exists, we should always log in again.
  59. return None
  60. user_data = self._auth_helpers.parseJWT(self._auth_data.access_token)
  61. if user_data:
  62. # If the profile was found, we return it immediately.
  63. return user_data
  64. # The JWT was expired or invalid and we should request a new one.
  65. if self._auth_data.refresh_token is None:
  66. return None
  67. self._auth_data = self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)
  68. if not self._auth_data or self._auth_data.access_token is None:
  69. # The token could not be refreshed using the refresh token. We should login again.
  70. return None
  71. return self._auth_helpers.parseJWT(self._auth_data.access_token)
  72. # Get the access token as provided by the response data.
  73. def getAccessToken(self) -> Optional[str]:
  74. if not self.getUserProfile():
  75. # We check if we can get the user profile.
  76. # If we can't get it, that means the access token (JWT) was invalid or expired.
  77. # In that case we try to refresh the access token.
  78. self.refreshAccessToken()
  79. if self._auth_data is None:
  80. Logger.log("d", "No auth data to retrieve the access_token from")
  81. return None
  82. return self._auth_data.access_token
  83. # Try to refresh the access token. This should be used when it has expired.
  84. def refreshAccessToken(self) -> None:
  85. if self._auth_data is None or self._auth_data.refresh_token is None:
  86. Logger.log("w", "Unable to refresh access token, since there is no refresh token.")
  87. return
  88. self._storeAuthData(self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token))
  89. self.onAuthStateChanged.emit(logged_in=True)
  90. # Delete the authentication data that we have stored locally (eg; logout)
  91. def deleteAuthData(self) -> None:
  92. if self._auth_data is not None:
  93. self._storeAuthData()
  94. self.onAuthStateChanged.emit(logged_in=False)
  95. # Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login.
  96. def startAuthorizationFlow(self) -> None:
  97. Logger.log("d", "Starting new OAuth2 flow...")
  98. # Create the tokens needed for the code challenge (PKCE) extension for OAuth2.
  99. # This is needed because the CuraDrivePlugin is a untrusted (open source) client.
  100. # More details can be found at https://tools.ietf.org/html/rfc7636.
  101. verification_code = self._auth_helpers.generateVerificationCode()
  102. challenge_code = self._auth_helpers.generateVerificationCodeChallenge(verification_code)
  103. # Create the query string needed for the OAuth2 flow.
  104. query_string = urlencode({
  105. "client_id": self._settings.CLIENT_ID,
  106. "redirect_uri": self._settings.CALLBACK_URL,
  107. "scope": self._settings.CLIENT_SCOPES,
  108. "response_type": "code",
  109. "state": "(.Y.)",
  110. "code_challenge": challenge_code,
  111. "code_challenge_method": "S512"
  112. })
  113. # Open the authorization page in a new browser window.
  114. webbrowser.open_new("{}?{}".format(self._auth_url, query_string))
  115. # Start a local web server to receive the callback URL on.
  116. self._server.start(verification_code)
  117. # Callback method for the authentication flow.
  118. def _onAuthStateChanged(self, auth_response: AuthenticationResponse) -> None:
  119. if auth_response.success:
  120. self._storeAuthData(auth_response)
  121. self.onAuthStateChanged.emit(logged_in=True)
  122. else:
  123. self.onAuthenticationError.emit(logged_in=False, error_message=auth_response.err_message)
  124. self._server.stop() # Stop the web server at all times.
  125. # Load authentication data from preferences.
  126. def loadAuthDataFromPreferences(self) -> None:
  127. if self._preferences is None:
  128. Logger.log("e", "Unable to load authentication data, since no preference has been set!")
  129. return
  130. try:
  131. preferences_data = json.loads(self._preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
  132. if preferences_data:
  133. self._auth_data = AuthenticationResponse(**preferences_data)
  134. self.onAuthStateChanged.emit(logged_in=True)
  135. except ValueError:
  136. Logger.logException("w", "Could not load auth data from preferences")
  137. # Store authentication data in preferences.
  138. def _storeAuthData(self, auth_data: Optional[AuthenticationResponse] = None) -> None:
  139. if self._preferences is None:
  140. Logger.log("e", "Unable to save authentication data, since no preference has been set!")
  141. return
  142. self._auth_data = auth_data
  143. if auth_data:
  144. self._user_profile = self.getUserProfile()
  145. self._preferences.setValue(self._settings.AUTH_DATA_PREFERENCE_KEY, json.dumps(vars(auth_data)))
  146. else:
  147. self._user_profile = None
  148. self._preferences.resetPreference(self._settings.AUTH_DATA_PREFERENCE_KEY)