AuthorizationService.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import json
  4. import webbrowser
  5. from typing import Optional
  6. from urllib.parse import urlencode
  7. # As this module is specific for Cura plugins, we can rely on these imports.
  8. from UM.Logger import Logger
  9. from UM.Signal import Signal
  10. # Plugin imports need to be relative to work in final builds.
  11. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  12. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
  13. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  14. class AuthorizationService:
  15. """
  16. The authorization service is responsible for handling the login flow,
  17. storing user credentials and providing account information.
  18. """
  19. # Emit signal when authentication is completed.
  20. onAuthStateChanged = Signal()
  21. # Emit signal when authentication failed.
  22. onAuthenticationError = Signal()
  23. def __init__(self, preferences, settings: "OAuth2Settings"):
  24. self._settings = settings
  25. self._auth_helpers = AuthorizationHelpers(settings)
  26. self._auth_url = "{}/authorize".format(self._settings.OAUTH_SERVER_URL)
  27. self._auth_data = None # type: Optional[AuthenticationResponse]
  28. self._user_profile = None # type: Optional[UserProfile]
  29. self._cura_preferences = preferences
  30. self._server = LocalAuthorizationServer(self._auth_helpers, self._onAuthStateChanged, daemon=True)
  31. self._loadAuthData()
  32. def getUserProfile(self) -> Optional["UserProfile"]:
  33. """
  34. Get the user data that is stored in the JWT token.
  35. :return: Dict containing some user data.
  36. """
  37. if not self._user_profile:
  38. # If no user profile was stored locally, we try to get it from JWT.
  39. self._user_profile = self._parseJWT()
  40. if not self._user_profile:
  41. # If there is still no user profile from the JWT, we have to log in again.
  42. return None
  43. return self._user_profile
  44. def _parseJWT(self) -> Optional["UserProfile"]:
  45. """
  46. Tries to parse the JWT if all the needed data exists.
  47. :return: UserProfile if found, otherwise None.
  48. """
  49. if not self._auth_data:
  50. # If no auth data exists, we should always log in again.
  51. return None
  52. user_data = self._auth_helpers.parseJWT(self._auth_data.access_token)
  53. if user_data:
  54. # If the profile was found, we return it immediately.
  55. return user_data
  56. # The JWT was expired or invalid and we should request a new one.
  57. self._auth_data = self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)
  58. if not self._auth_data:
  59. # The token could not be refreshed using the refresh token. We should login again.
  60. return None
  61. return self._auth_helpers.parseJWT(self._auth_data.access_token)
  62. def getAccessToken(self) -> Optional[str]:
  63. """
  64. Get the access token response data.
  65. :return: Dict containing token data.
  66. """
  67. if not self.getUserProfile():
  68. # We check if we can get the user profile.
  69. # If we can't get it, that means the access token (JWT) was invalid or expired.
  70. return None
  71. return self._auth_data.access_token
  72. def refreshAccessToken(self) -> None:
  73. """
  74. Refresh the access token when it expired.
  75. """
  76. self._storeAuthData(self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token))
  77. self.onAuthStateChanged.emit(logged_in=True)
  78. def deleteAuthData(self):
  79. """Delete authentication data from preferences and locally."""
  80. self._storeAuthData()
  81. self.onAuthStateChanged.emit(logged_in=False)
  82. def startAuthorizationFlow(self) -> None:
  83. """Start a new OAuth2 authorization flow."""
  84. Logger.log("d", "Starting new OAuth2 flow...")
  85. # Create the tokens needed for the code challenge (PKCE) extension for OAuth2.
  86. # This is needed because the CuraDrivePlugin is a untrusted (open source) client.
  87. # More details can be found at https://tools.ietf.org/html/rfc7636.
  88. verification_code = self._auth_helpers.generateVerificationCode()
  89. challenge_code = self._auth_helpers.generateVerificationCodeChallenge(verification_code)
  90. # Create the query string needed for the OAuth2 flow.
  91. query_string = urlencode({
  92. "client_id": self._settings.CLIENT_ID,
  93. "redirect_uri": self._settings.CALLBACK_URL,
  94. "scope": self._settings.CLIENT_SCOPES,
  95. "response_type": "code",
  96. "state": "CuraDriveIsAwesome",
  97. "code_challenge": challenge_code,
  98. "code_challenge_method": "S512"
  99. })
  100. # Open the authorization page in a new browser window.
  101. webbrowser.open_new("{}?{}".format(self._auth_url, query_string))
  102. # Start a local web server to receive the callback URL on.
  103. self._server.start(verification_code)
  104. def _onAuthStateChanged(self, auth_response: "AuthenticationResponse") -> None:
  105. """Callback method for an authentication flow."""
  106. if auth_response.success:
  107. self._storeAuthData(auth_response)
  108. self.onAuthStateChanged.emit(logged_in=True)
  109. else:
  110. self.onAuthenticationError.emit(logged_in=False, error_message=auth_response.err_message)
  111. self._server.stop() # Stop the web server at all times.
  112. def _loadAuthData(self) -> None:
  113. """Load authentication data from preferences if available."""
  114. self._cura_preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
  115. try:
  116. preferences_data = json.loads(self._cura_preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
  117. if preferences_data:
  118. self._auth_data = AuthenticationResponse(**preferences_data)
  119. self.onAuthStateChanged.emit(logged_in=True)
  120. except ValueError as err:
  121. Logger.log("w", "Could not load auth data from preferences: %s", err)
  122. def _storeAuthData(self, auth_data: Optional["AuthenticationResponse"] = None) -> None:
  123. """Store authentication data in preferences and locally."""
  124. self._auth_data = auth_data
  125. if auth_data:
  126. self._user_profile = self.getUserProfile()
  127. self._cura_preferences.setValue(self._settings.AUTH_DATA_PREFERENCE_KEY, json.dumps(vars(auth_data)))
  128. else:
  129. self._user_profile = None
  130. self._cura_preferences.resetPreference(self._settings.AUTH_DATA_PREFERENCE_KEY)