|
@@ -25,9 +25,11 @@ if TYPE_CHECKING:
|
|
|
from UM.Preferences import Preferences
|
|
|
|
|
|
|
|
|
-## The authorization service is responsible for handling the login flow,
|
|
|
-# storing user credentials and providing account information.
|
|
|
class AuthorizationService:
|
|
|
+ """The authorization service is responsible for handling the login flow, storing user credentials and providing
|
|
|
+ account information.
|
|
|
+ """
|
|
|
+
|
|
|
# Emit signal when authentication is completed.
|
|
|
onAuthStateChanged = Signal()
|
|
|
|
|
@@ -59,11 +61,16 @@ class AuthorizationService:
|
|
|
if self._preferences:
|
|
|
self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
|
|
|
|
|
|
- ## Get the user profile as obtained from the JWT (JSON Web Token).
|
|
|
- # If the JWT is not yet parsed, calling this will take care of that.
|
|
|
- # \return UserProfile if a user is logged in, None otherwise.
|
|
|
- # \sa _parseJWT
|
|
|
def getUserProfile(self) -> Optional["UserProfile"]:
|
|
|
+ """Get the user profile as obtained from the JWT (JSON Web Token).
|
|
|
+
|
|
|
+ If the JWT is not yet parsed, calling this will take care of that.
|
|
|
+
|
|
|
+ :return: UserProfile if a user is logged in, None otherwise.
|
|
|
+
|
|
|
+ See also: :py:method:`cura.OAuth2.AuthorizationService.AuthorizationService._parseJWT`
|
|
|
+ """
|
|
|
+
|
|
|
if not self._user_profile:
|
|
|
# If no user profile was stored locally, we try to get it from JWT.
|
|
|
try:
|
|
@@ -81,9 +88,12 @@ class AuthorizationService:
|
|
|
|
|
|
return self._user_profile
|
|
|
|
|
|
- ## Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
|
|
|
- # \return UserProfile if it was able to parse, None otherwise.
|
|
|
def _parseJWT(self) -> Optional["UserProfile"]:
|
|
|
+ """Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
|
|
|
+
|
|
|
+ :return: UserProfile if it was able to parse, None otherwise.
|
|
|
+ """
|
|
|
+
|
|
|
if not self._auth_data or self._auth_data.access_token is None:
|
|
|
# If no auth data exists, we should always log in again.
|
|
|
Logger.log("d", "There was no auth data or access token")
|
|
@@ -106,8 +116,9 @@ class AuthorizationService:
|
|
|
self._storeAuthData(self._auth_data)
|
|
|
return self._auth_helpers.parseJWT(self._auth_data.access_token)
|
|
|
|
|
|
- ## Get the access token as provided by the repsonse data.
|
|
|
def getAccessToken(self) -> Optional[str]:
|
|
|
+ """Get the access token as provided by the repsonse data."""
|
|
|
+
|
|
|
if self._auth_data is None:
|
|
|
Logger.log("d", "No auth data to retrieve the access_token from")
|
|
|
return None
|
|
@@ -122,8 +133,9 @@ class AuthorizationService:
|
|
|
|
|
|
return self._auth_data.access_token if self._auth_data else None
|
|
|
|
|
|
- ## Try to refresh the access token. This should be used when it has expired.
|
|
|
def refreshAccessToken(self) -> None:
|
|
|
+ """Try to refresh the access token. This should be used when it has expired."""
|
|
|
+
|
|
|
if self._auth_data is None or self._auth_data.refresh_token is None:
|
|
|
Logger.log("w", "Unable to refresh access token, since there is no refresh token.")
|
|
|
return
|
|
@@ -135,14 +147,16 @@ class AuthorizationService:
|
|
|
Logger.log("w", "Failed to get a new access token from the server.")
|
|
|
self.onAuthStateChanged.emit(logged_in = False)
|
|
|
|
|
|
- ## Delete the authentication data that we have stored locally (eg; logout)
|
|
|
def deleteAuthData(self) -> None:
|
|
|
+ """Delete the authentication data that we have stored locally (eg; logout)"""
|
|
|
+
|
|
|
if self._auth_data is not None:
|
|
|
self._storeAuthData()
|
|
|
self.onAuthStateChanged.emit(logged_in = False)
|
|
|
|
|
|
- ## Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login.
|
|
|
def startAuthorizationFlow(self) -> None:
|
|
|
+ """Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login."""
|
|
|
+
|
|
|
Logger.log("d", "Starting new OAuth2 flow...")
|
|
|
|
|
|
# Create the tokens needed for the code challenge (PKCE) extension for OAuth2.
|
|
@@ -177,8 +191,9 @@ class AuthorizationService:
|
|
|
QDesktopServices.openUrl(QUrl("{}?{}".format(self._auth_url, query_string)))
|
|
|
|
|
|
|
|
|
- ## Callback method for the authentication flow.
|
|
|
def _onAuthStateChanged(self, auth_response: AuthenticationResponse) -> None:
|
|
|
+ """Callback method for the authentication flow."""
|
|
|
+
|
|
|
if auth_response.success:
|
|
|
self._storeAuthData(auth_response)
|
|
|
self.onAuthStateChanged.emit(logged_in = True)
|
|
@@ -186,8 +201,9 @@ class AuthorizationService:
|
|
|
self.onAuthenticationError.emit(logged_in = False, error_message = auth_response.err_message)
|
|
|
self._server.stop() # Stop the web server at all times.
|
|
|
|
|
|
- ## Load authentication data from preferences.
|
|
|
def loadAuthDataFromPreferences(self) -> None:
|
|
|
+ """Load authentication data from preferences."""
|
|
|
+
|
|
|
if self._preferences is None:
|
|
|
Logger.log("e", "Unable to load authentication data, since no preference has been set!")
|
|
|
return
|
|
@@ -208,8 +224,9 @@ class AuthorizationService:
|
|
|
except ValueError:
|
|
|
Logger.logException("w", "Could not load auth data from preferences")
|
|
|
|
|
|
- ## Store authentication data in preferences.
|
|
|
def _storeAuthData(self, auth_data: Optional[AuthenticationResponse] = None) -> None:
|
|
|
+ """Store authentication data in preferences."""
|
|
|
+
|
|
|
Logger.log("d", "Attempting to store the auth data")
|
|
|
if self._preferences is None:
|
|
|
Logger.log("e", "Unable to save authentication data, since no preference has been set!")
|