AuthorizationService.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. # Copyright (c) 2019 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import json
  4. from datetime import datetime, timedelta
  5. from typing import Optional, TYPE_CHECKING, Dict
  6. from urllib.parse import urlencode, quote_plus
  7. import requests.exceptions
  8. from PyQt5.QtCore import QUrl
  9. from PyQt5.QtGui import QDesktopServices
  10. from UM.Logger import Logger
  11. from UM.Message import Message
  12. from UM.Signal import Signal
  13. from UM.i18n import i18nCatalog
  14. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
  15. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  16. from cura.OAuth2.Models import AuthenticationResponse
  17. i18n_catalog = i18nCatalog("cura")
  18. if TYPE_CHECKING:
  19. from cura.OAuth2.Models import UserProfile, OAuth2Settings
  20. from UM.Preferences import Preferences
  21. MYCLOUD_LOGOFF_URL = "https://mycloud.ultimaker.com/logoff"
  22. ## The authorization service is responsible for handling the login flow,
  23. # storing user credentials and providing account information.
  24. class AuthorizationService:
  25. # Emit signal when authentication is completed.
  26. onAuthStateChanged = Signal()
  27. # Emit signal when authentication failed.
  28. onAuthenticationError = Signal()
  29. accessTokenChanged = Signal()
  30. def __init__(self, settings: "OAuth2Settings", preferences: Optional["Preferences"] = None) -> None:
  31. self._settings = settings
  32. self._auth_helpers = AuthorizationHelpers(settings)
  33. self._auth_url = "{}/authorize".format(self._settings.OAUTH_SERVER_URL)
  34. self._auth_data = None # type: Optional[AuthenticationResponse]
  35. self._user_profile = None # type: Optional["UserProfile"]
  36. self._preferences = preferences
  37. self._server = LocalAuthorizationServer(self._auth_helpers, self._onAuthStateChanged, daemon=True)
  38. self._unable_to_get_data_message = None # type: Optional[Message]
  39. self.onAuthStateChanged.connect(self._authChanged)
  40. def _authChanged(self, logged_in):
  41. if logged_in and self._unable_to_get_data_message is not None:
  42. self._unable_to_get_data_message.hide()
  43. def initialize(self, preferences: Optional["Preferences"] = None) -> None:
  44. if preferences is not None:
  45. self._preferences = preferences
  46. if self._preferences:
  47. self._preferences.addPreference(self._settings.AUTH_DATA_PREFERENCE_KEY, "{}")
  48. ## Get the user profile as obtained from the JWT (JSON Web Token).
  49. # If the JWT is not yet parsed, calling this will take care of that.
  50. # \return UserProfile if a user is logged in, None otherwise.
  51. # \sa _parseJWT
  52. def getUserProfile(self) -> Optional["UserProfile"]:
  53. if not self._user_profile:
  54. # If no user profile was stored locally, we try to get it from JWT.
  55. try:
  56. self._user_profile = self._parseJWT()
  57. except requests.exceptions.ConnectionError:
  58. # Unable to get connection, can't login.
  59. Logger.logException("w", "Unable to validate user data with the remote server.")
  60. return None
  61. if not self._user_profile and self._auth_data:
  62. # If there is still no user profile from the JWT, we have to log in again.
  63. Logger.log("w", "The user profile could not be loaded. The user must log in again!")
  64. self.deleteAuthData()
  65. return None
  66. return self._user_profile
  67. ## Tries to parse the JWT (JSON Web Token) data, which it does if all the needed data is there.
  68. # \return UserProfile if it was able to parse, None otherwise.
  69. def _parseJWT(self) -> Optional["UserProfile"]:
  70. if not self._auth_data or self._auth_data.access_token is None:
  71. # If no auth data exists, we should always log in again.
  72. Logger.log("d", "There was no auth data or access token")
  73. return None
  74. user_data = self._auth_helpers.parseJWT(self._auth_data.access_token)
  75. if user_data:
  76. # If the profile was found, we return it immediately.
  77. return user_data
  78. # The JWT was expired or invalid and we should request a new one.
  79. if self._auth_data.refresh_token is None:
  80. Logger.log("w", "There was no refresh token in the auth data.")
  81. return None
  82. self._auth_data = self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)
  83. if not self._auth_data or self._auth_data.access_token is None:
  84. Logger.log("w", "Unable to use the refresh token to get a new access token.")
  85. # The token could not be refreshed using the refresh token. We should login again.
  86. return None
  87. # Ensure it gets stored as otherwise we only have it in memory. The stored refresh token has been deleted
  88. # from the server already.
  89. self._storeAuthData(self._auth_data)
  90. return self._auth_helpers.parseJWT(self._auth_data.access_token)
  91. ## Get the access token as provided by the repsonse data.
  92. def getAccessToken(self) -> Optional[str]:
  93. if self._auth_data is None:
  94. Logger.log("d", "No auth data to retrieve the access_token from")
  95. return None
  96. # Check if the current access token is expired and refresh it if that is the case.
  97. # We have a fallback on a date far in the past for currently stored auth data in cura.cfg.
  98. received_at = datetime.strptime(self._auth_data.received_at, TOKEN_TIMESTAMP_FORMAT) \
  99. if self._auth_data.received_at else datetime(2000, 1, 1)
  100. expiry_date = received_at + timedelta(seconds = float(self._auth_data.expires_in or 0) - 60)
  101. if datetime.now() > expiry_date:
  102. self.refreshAccessToken()
  103. return self._auth_data.access_token if self._auth_data else None
  104. ## Try to refresh the access token. This should be used when it has expired.
  105. def refreshAccessToken(self) -> None:
  106. if self._auth_data is None or self._auth_data.refresh_token is None:
  107. Logger.log("w", "Unable to refresh access token, since there is no refresh token.")
  108. return
  109. response = self._auth_helpers.getAccessTokenUsingRefreshToken(self._auth_data.refresh_token)
  110. if response.success:
  111. self._storeAuthData(response)
  112. self.onAuthStateChanged.emit(logged_in = True)
  113. else:
  114. Logger.log("w", "Failed to get a new access token from the server.")
  115. self.onAuthStateChanged.emit(logged_in = False)
  116. ## Delete the authentication data that we have stored locally (eg; logout)
  117. def deleteAuthData(self) -> None:
  118. if self._auth_data is not None:
  119. self._storeAuthData()
  120. self.onAuthStateChanged.emit(logged_in = False)
  121. ## Start the flow to become authenticated. This will start a new webbrowser tap, prompting the user to login.
  122. def startAuthorizationFlow(self, force_browser_logout: bool = False) -> None:
  123. Logger.log("d", "Starting new OAuth2 flow...")
  124. # Create the tokens needed for the code challenge (PKCE) extension for OAuth2.
  125. # This is needed because the CuraDrivePlugin is a untrusted (open source) client.
  126. # More details can be found at https://tools.ietf.org/html/rfc7636.
  127. verification_code = self._auth_helpers.generateVerificationCode()
  128. challenge_code = self._auth_helpers.generateVerificationCodeChallenge(verification_code)
  129. state = AuthorizationHelpers.generateVerificationCode()
  130. # Create the query dict needed for the OAuth2 flow.
  131. query_parameters_dict = {
  132. "client_id": self._settings.CLIENT_ID,
  133. "redirect_uri": self._settings.CALLBACK_URL,
  134. "scope": self._settings.CLIENT_SCOPES,
  135. "response_type": "code",
  136. "state": state, # Forever in our Hearts, RIP "(.Y.)" (2018-2020)
  137. "code_challenge": challenge_code,
  138. "code_challenge_method": "S512"
  139. }
  140. # Start a local web server to receive the callback URL on.
  141. try:
  142. self._server.start(verification_code, state)
  143. except OSError:
  144. Logger.logException("w", "Unable to create authorization request server")
  145. Message(i18n_catalog.i18nc("@info", "Unable to start a new sign in process. Check if another sign in attempt is still active."),
  146. title=i18n_catalog.i18nc("@info:title", "Warning")).show()
  147. return
  148. auth_url = self._generate_auth_url(query_parameters_dict, force_browser_logout)
  149. # Open the authorization page in a new browser window.
  150. QDesktopServices.openUrl(QUrl(auth_url))
  151. def _generate_auth_url(self, query_parameters_dict: Dict[str, Optional[str]], force_browser_logout: bool) -> str:
  152. """
  153. Generates the authentications url based on the original auth_url and the query_parameters_dict to be included.
  154. If there is a request to force logging out of mycloud in the browser, the link to logoff from mycloud is
  155. prepended in order to force the browser to logoff from mycloud and then redirect to the authentication url to
  156. login again. This case is used to sync the accounts between Cura and the browser.
  157. :param query_parameters_dict: A dictionary with the query parameters to be url encoded and added to the
  158. authentication link
  159. :param force_browser_logout: If True, Cura will prepend the MYCLOUD_LOGOFF_URL link before the authentication
  160. link to force the a browser logout from mycloud.ultimaker.com
  161. :return: The authentication URL, properly formatted and encoded
  162. """
  163. auth_url = "{}?{}".format(self._auth_url, urlencode(query_parameters_dict))
  164. if force_browser_logout:
  165. # The url after '?next=' should be urlencoded
  166. auth_url = "{}?next={}".format(MYCLOUD_LOGOFF_URL, quote_plus(auth_url))
  167. return auth_url
  168. ## Callback method for the authentication flow.
  169. def _onAuthStateChanged(self, auth_response: AuthenticationResponse) -> None:
  170. if auth_response.success:
  171. self._storeAuthData(auth_response)
  172. self.onAuthStateChanged.emit(logged_in = True)
  173. else:
  174. self.onAuthenticationError.emit(logged_in = False, error_message = auth_response.err_message)
  175. self._server.stop() # Stop the web server at all times.
  176. ## Load authentication data from preferences.
  177. def loadAuthDataFromPreferences(self) -> None:
  178. if self._preferences is None:
  179. Logger.log("e", "Unable to load authentication data, since no preference has been set!")
  180. return
  181. try:
  182. preferences_data = json.loads(self._preferences.getValue(self._settings.AUTH_DATA_PREFERENCE_KEY))
  183. if preferences_data:
  184. self._auth_data = AuthenticationResponse(**preferences_data)
  185. # Also check if we can actually get the user profile information.
  186. user_profile = self.getUserProfile()
  187. if user_profile is not None:
  188. self.onAuthStateChanged.emit(logged_in = True)
  189. else:
  190. if self._unable_to_get_data_message is not None:
  191. self._unable_to_get_data_message.hide()
  192. self._unable_to_get_data_message = Message(i18n_catalog.i18nc("@info", "Unable to reach the Ultimaker account server."), title = i18n_catalog.i18nc("@info:title", "Warning"))
  193. self._unable_to_get_data_message.show()
  194. except ValueError:
  195. Logger.logException("w", "Could not load auth data from preferences")
  196. ## Store authentication data in preferences.
  197. def _storeAuthData(self, auth_data: Optional[AuthenticationResponse] = None) -> None:
  198. Logger.log("d", "Attempting to store the auth data")
  199. if self._preferences is None:
  200. Logger.log("e", "Unable to save authentication data, since no preference has been set!")
  201. return
  202. self._auth_data = auth_data
  203. if auth_data:
  204. self._user_profile = self.getUserProfile()
  205. self._preferences.setValue(self._settings.AUTH_DATA_PREFERENCE_KEY, json.dumps(vars(auth_data)))
  206. else:
  207. self._user_profile = None
  208. self._preferences.resetPreference(self._settings.AUTH_DATA_PREFERENCE_KEY)
  209. self.accessTokenChanged.emit()