TestOAuth2.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. from datetime import datetime
  2. from unittest.mock import MagicMock, patch
  3. import requests
  4. from PyQt5.QtGui import QDesktopServices
  5. from UM.Preferences import Preferences
  6. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
  7. from cura.OAuth2.AuthorizationService import AuthorizationService, MYCLOUD_LOGOFF_URL
  8. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  9. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  10. CALLBACK_PORT = 32118
  11. OAUTH_ROOT = "https://account.ultimaker.com"
  12. CLOUD_API_ROOT = "https://api.ultimaker.com"
  13. OAUTH_SETTINGS = OAuth2Settings(
  14. OAUTH_SERVER_URL= OAUTH_ROOT,
  15. CALLBACK_PORT=CALLBACK_PORT,
  16. CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
  17. CLIENT_ID="",
  18. CLIENT_SCOPES="",
  19. AUTH_DATA_PREFERENCE_KEY="test/auth_data",
  20. AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
  21. AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
  22. )
  23. FAILED_AUTH_RESPONSE = AuthenticationResponse(
  24. success = False,
  25. err_message = "FAILURE!"
  26. )
  27. SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
  28. access_token = "beep",
  29. refresh_token = "beep?",
  30. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  31. expires_in = 300, # 5 minutes should be more than enough for testing
  32. success = True
  33. )
  34. NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse(
  35. access_token = "beep",
  36. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  37. expires_in = 300, # 5 minutes should be more than enough for testing
  38. success = True
  39. )
  40. MALFORMED_AUTH_RESPONSE = AuthenticationResponse()
  41. def test_cleanAuthService() -> None:
  42. # Ensure that when setting up an AuthorizationService, no data is set.
  43. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  44. authorization_service.initialize()
  45. assert authorization_service.getUserProfile() is None
  46. assert authorization_service.getAccessToken() is None
  47. def test_refreshAccessTokenSuccess():
  48. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  49. authorization_service.initialize()
  50. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  51. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  52. authorization_service.onAuthStateChanged.emit = MagicMock()
  53. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
  54. authorization_service.refreshAccessToken()
  55. assert authorization_service.onAuthStateChanged.emit.called_with(True)
  56. def test__parseJWTNoRefreshToken():
  57. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  58. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  59. authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
  60. assert authorization_service._parseJWT() is None
  61. def test__parseJWTFailOnRefresh():
  62. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  63. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  64. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  65. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
  66. assert authorization_service._parseJWT() is None
  67. def test__parseJWTSucceedOnRefresh():
  68. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  69. authorization_service.initialize()
  70. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  71. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  72. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
  73. with patch.object(AuthorizationHelpers, "parseJWT", MagicMock(return_value = None)) as mocked_parseJWT:
  74. authorization_service._parseJWT()
  75. mocked_parseJWT.assert_called_with("beep")
  76. def test_initialize():
  77. original_preference = MagicMock()
  78. initialize_preferences = MagicMock()
  79. authorization_service = AuthorizationService(OAUTH_SETTINGS, original_preference)
  80. authorization_service.initialize(initialize_preferences)
  81. initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}")
  82. original_preference.addPreference.assert_not_called()
  83. def test_refreshAccessTokenFailed():
  84. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  85. authorization_service.initialize()
  86. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  87. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  88. authorization_service.onAuthStateChanged.emit = MagicMock()
  89. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
  90. authorization_service.refreshAccessToken()
  91. assert authorization_service.onAuthStateChanged.emit.called_with(False)
  92. def test_refreshAccesTokenWithoutData():
  93. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  94. authorization_service.initialize()
  95. authorization_service.onAuthStateChanged.emit = MagicMock()
  96. authorization_service.refreshAccessToken()
  97. authorization_service.onAuthStateChanged.emit.assert_not_called()
  98. def test_userProfileException():
  99. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  100. authorization_service.initialize()
  101. authorization_service._parseJWT = MagicMock(side_effect=requests.exceptions.ConnectionError)
  102. assert authorization_service.getUserProfile() is None
  103. def test_failedLogin() -> None:
  104. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  105. authorization_service.onAuthenticationError.emit = MagicMock()
  106. authorization_service.onAuthStateChanged.emit = MagicMock()
  107. authorization_service.initialize()
  108. # Let the service think there was a failed response
  109. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  110. # Check that the error signal was triggered
  111. assert authorization_service.onAuthenticationError.emit.call_count == 1
  112. # Since nothing changed, this should still be 0.
  113. assert authorization_service.onAuthStateChanged.emit.call_count == 0
  114. # Validate that there is no user profile or token
  115. assert authorization_service.getUserProfile() is None
  116. assert authorization_service.getAccessToken() is None
  117. @patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile())
  118. def test_storeAuthData(get_user_profile) -> None:
  119. preferences = Preferences()
  120. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  121. authorization_service.initialize()
  122. # Write stuff to the preferences.
  123. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  124. preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
  125. # Check that something was actually put in the preferences
  126. assert preference_value is not None and preference_value != {}
  127. # Create a second auth service, so we can load the data.
  128. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  129. second_auth_service.initialize()
  130. second_auth_service.loadAuthDataFromPreferences()
  131. assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
  132. @patch.object(LocalAuthorizationServer, "stop")
  133. @patch.object(LocalAuthorizationServer, "start")
  134. @patch.object(QDesktopServices, "openUrl")
  135. def test_localAuthServer(QDesktopServices_openUrl, start_auth_server, stop_auth_server) -> None:
  136. preferences = Preferences()
  137. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  138. authorization_service.startAuthorizationFlow()
  139. assert QDesktopServices_openUrl.call_count == 1
  140. # Ensure that the Authorization service tried to start the server.
  141. assert start_auth_server.call_count == 1
  142. assert stop_auth_server.call_count == 0
  143. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  144. # Ensure that it stopped the server.
  145. assert stop_auth_server.call_count == 1
  146. def test_loginAndLogout() -> None:
  147. preferences = Preferences()
  148. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  149. authorization_service.onAuthenticationError.emit = MagicMock()
  150. authorization_service.onAuthStateChanged.emit = MagicMock()
  151. authorization_service.initialize()
  152. # Let the service think there was a successful response
  153. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  154. authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
  155. # Ensure that the error signal was not triggered
  156. assert authorization_service.onAuthenticationError.emit.call_count == 0
  157. # Since we said that it went right this time, validate that we got a signal.
  158. assert authorization_service.onAuthStateChanged.emit.call_count == 1
  159. assert authorization_service.getUserProfile() is not None
  160. assert authorization_service.getAccessToken() == "beep"
  161. # Check that we stored the authentication data, so next time the user won't have to log in again.
  162. assert preferences.getValue("test/auth_data") is not None
  163. # We're logged in now, also check if logging out works
  164. authorization_service.deleteAuthData()
  165. assert authorization_service.onAuthStateChanged.emit.call_count == 2
  166. assert authorization_service.getUserProfile() is None
  167. # Ensure the data is gone after we logged out.
  168. assert preferences.getValue("test/auth_data") == "{}"
  169. def test_wrongServerResponses() -> None:
  170. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  171. authorization_service.initialize()
  172. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  173. authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
  174. assert authorization_service.getUserProfile() is None
  175. def test__generate_auth_url() -> None:
  176. preferences = Preferences()
  177. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  178. query_parameters_dict = {
  179. "client_id": "",
  180. "redirect_uri": OAUTH_SETTINGS.CALLBACK_URL,
  181. "scope": OAUTH_SETTINGS.CLIENT_SCOPES,
  182. "response_type": "code"
  183. }
  184. auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = False)
  185. assert MYCLOUD_LOGOFF_URL + "?next=" not in auth_url
  186. auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = True)
  187. assert MYCLOUD_LOGOFF_URL + "?next=" in auth_url