TestOAuth2.py 11 KB


  1. from datetime import datetime
  2. from unittest.mock import MagicMock, patch
  3. import requests
  4. from PyQt5.QtGui import QDesktopServices
  5. from UM.Preferences import Preferences
  6. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
  7. from cura.OAuth2.AuthorizationService import AuthorizationService, MYCLOUD_LOGOFF_URL
  8. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  9. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  10. CALLBACK_PORT = 32118
  11. OAUTH_ROOT = "https://account.ultimaker.com"
  12. CLOUD_API_ROOT = "https://api.ultimaker.com"
  13. OAUTH_SETTINGS = OAuth2Settings(
  14. OAUTH_SERVER_URL= OAUTH_ROOT,
  15. CALLBACK_PORT=CALLBACK_PORT,
  16. CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
  17. CLIENT_ID="",
  18. CLIENT_SCOPES="",
  19. AUTH_DATA_PREFERENCE_KEY="test/auth_data",
  20. AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
  21. AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
  22. )
  23. FAILED_AUTH_RESPONSE = AuthenticationResponse(
  24. success = False,
  25. err_message = "FAILURE!"
  26. )
  27. SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
  28. access_token = "beep",
  29. refresh_token = "beep?",
  30. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  31. expires_in = 300, # 5 minutes should be more than enough for testing
  32. success = True
  33. )
  34. NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse(
  35. access_token = "beep",
  36. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  37. expires_in = 300, # 5 minutes should be more than enough for testing
  38. success = True
  39. )
  40. MALFORMED_AUTH_RESPONSE = AuthenticationResponse()
  41. def test_cleanAuthService() -> None:
  42. # Ensure that when setting up an AuthorizationService, no data is set.
  43. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  44. authorization_service.initialize()
  45. assert authorization_service.getUserProfile() is None
  46. assert authorization_service.getAccessToken() is None
  47. def test_refreshAccessTokenSuccess():
  48. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  49. authorization_service.initialize()
  50. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  51. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  52. authorization_service.onAuthStateChanged.emit = MagicMock()
  53. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
  54. authorization_service.refreshAccessToken()
  55. assert authorization_service.onAuthStateChanged.emit.called_with(True)
  56. def test__parseJWTNoRefreshToken():
  57. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  58. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  59. authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
  60. assert authorization_service._parseJWT() is None
  61. def test__parseJWTFailOnRefresh():
  62. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  63. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  64. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  65. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
  66. assert authorization_service._parseJWT() is None
  67. def test__parseJWTSucceedOnRefresh():
  68. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  69. authorization_service.initialize()
  70. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  71. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  72. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
  73. with patch.object(AuthorizationHelpers, "parseJWT", MagicMock(return_value = None)) as mocked_parseJWT:
  74. authorization_service._parseJWT()
  75. mocked_parseJWT.assert_called_with("beep")
  76. def test_initialize():
  77. original_preference = MagicMock()
  78. initialize_preferences = MagicMock()
  79. authorization_service = AuthorizationService(OAUTH_SETTINGS, original_preference)
  80. authorization_service.initialize(initialize_preferences)
  81. initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}")
  82. original_preference.addPreference.assert_not_called()
  83. def test_refreshAccessTokenFailed():
  84. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  85. authorization_service.initialize()
  86. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  87. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  88. authorization_service.onAuthStateChanged.emit = MagicMock()
  89. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
  90. authorization_service.refreshAccessToken()
  91. assert authorization_service.onAuthStateChanged.emit.called_with(False)
  92. def test_refreshAccesTokenWithoutData():
  93. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  94. authorization_service.initialize()
  95. authorization_service.onAuthStateChanged.emit = MagicMock()
  96. authorization_service.refreshAccessToken()
  97. authorization_service.onAuthStateChanged.emit.assert_not_called()
  98. def test_userProfileException():
  99. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  100. authorization_service.initialize()
  101. authorization_service._parseJWT = MagicMock(side_effect=requests.exceptions.ConnectionError)
  102. assert authorization_service.getUserProfile() is None
  103. def test_failedLogin() -> None:
  104. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  105. authorization_service.onAuthenticationError.emit = MagicMock()
  106. authorization_service.onAuthStateChanged.emit = MagicMock()
  107. authorization_service.initialize()
  108. # Let the service think there was a failed response
  109. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  110. # Check that the error signal was triggered
  111. assert authorization_service.onAuthenticationError.emit.call_count == 1
  112. # Since nothing changed, this should still be 0.
  113. assert authorization_service.onAuthStateChanged.emit.call_count == 0
  114. # Validate that there is no user profile or token
  115. assert authorization_service.getUserProfile() is None
  116. assert authorization_service.getAccessToken() is None
  117. @patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile())
  118. def test_storeAuthData(get_user_profile) -> None:
  119. preferences = Preferences()
  120. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  121. authorization_service.initialize()
  122. # Write stuff to the preferences.
  123. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  124. preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
  125. # Check that something was actually put in the preferences
  126. assert preference_value is not None and preference_value != {}
  127. # Create a second auth service, so we can load the data.
  128. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  129. second_auth_service.initialize()
  130. second_auth_service.loadAuthDataFromPreferences()
  131. assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
  132. @patch.object(LocalAuthorizationServer, "stop")
  133. @patch.object(LocalAuthorizationServer, "start")
  134. @patch.object(QDesktopServices, "openUrl")
  135. def test_localAuthServer(QDesktopServices_openUrl, start_auth_server, stop_auth_server) -> None:
  136. preferences = Preferences()
  137. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  138. authorization_service.startAuthorizationFlow()
  139. assert QDesktopServices_openUrl.call_count == 1
  140. # Ensure that the Authorization service tried to start the server.
  141. assert start_auth_server.call_count == 1
  142. assert stop_auth_server.call_count == 0
  143. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  144. # Ensure that it stopped the server.
  145. assert stop_auth_server.call_count == 1
  146. def test_loginAndLogout() -> None:
  147. preferences = Preferences()
  148. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  149. authorization_service.onAuthenticationError.emit = MagicMock()
  150. authorization_service.onAuthStateChanged.emit = MagicMock()
  151. authorization_service.initialize()
  152. # Let the service think there was a successful response
  153. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  154. authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
  155. # Ensure that the error signal was not triggered
  156. assert authorization_service.onAuthenticationError.emit.call_count == 0
  157. # Since we said that it went right this time, validate that we got a signal.
  158. assert authorization_service.onAuthStateChanged.emit.call_count == 1
  159. assert authorization_service.getUserProfile() is not None
  160. assert authorization_service.getAccessToken() == "beep"
  161. # Check that we stored the authentication data, so next time the user won't have to log in again.
  162. assert preferences.getValue("test/auth_data") is not None
  163. # We're logged in now, also check if logging out works
  164. authorization_service.deleteAuthData()
  165. assert authorization_service.onAuthStateChanged.emit.call_count == 2
  166. assert authorization_service.getUserProfile() is None
  167. # Ensure the data is gone after we logged out.
  168. assert preferences.getValue("test/auth_data") == "{}"
  169. def test_wrongServerResponses() -> None:
  170. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  171. authorization_service.initialize()
  172. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  173. authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
  174. assert authorization_service.getUserProfile() is None
  175. def test__generate_auth_url() -> None:
  176. preferences = Preferences()
  177. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  178. query_parameters_dict = {
  179. "client_id": "",
  180. "redirect_uri": OAUTH_SETTINGS.CALLBACK_URL,
  181. "scope": OAUTH_SETTINGS.CLIENT_SCOPES,
  182. "response_type": "code"
  183. }
  184. auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = False)
  185. assert MYCLOUD_LOGOFF_URL + "?next=" not in auth_url
  186. auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = True)
  187. assert MYCLOUD_LOGOFF_URL + "?next=" in auth_url