TestOAuth2.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import webbrowser
  2. from datetime import datetime
  3. from unittest.mock import MagicMock, patch
  4. import requests
  5. from UM.Preferences import Preferences
  6. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
  7. from cura.OAuth2.AuthorizationService import AuthorizationService
  8. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  9. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  10. CALLBACK_PORT = 32118
  11. OAUTH_ROOT = "https://account.ultimaker.com"
  12. CLOUD_API_ROOT = "https://api.ultimaker.com"
  13. OAUTH_SETTINGS = OAuth2Settings(
  14. OAUTH_SERVER_URL= OAUTH_ROOT,
  15. CALLBACK_PORT=CALLBACK_PORT,
  16. CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
  17. CLIENT_ID="",
  18. CLIENT_SCOPES="",
  19. AUTH_DATA_PREFERENCE_KEY="test/auth_data",
  20. AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
  21. AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
  22. )
  23. FAILED_AUTH_RESPONSE = AuthenticationResponse(
  24. success = False,
  25. err_message = "FAILURE!"
  26. )
  27. SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
  28. access_token = "beep",
  29. refresh_token = "beep?",
  30. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  31. expires_in = 300, # 5 minutes should be more than enough for testing
  32. success = True
  33. )
  34. NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse(
  35. access_token = "beep",
  36. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  37. expires_in = 300, # 5 minutes should be more than enough for testing
  38. success = True
  39. )
  40. MALFORMED_AUTH_RESPONSE = AuthenticationResponse()
  41. def test_cleanAuthService() -> None:
  42. # Ensure that when setting up an AuthorizationService, no data is set.
  43. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  44. authorization_service.initialize()
  45. assert authorization_service.getUserProfile() is None
  46. assert authorization_service.getAccessToken() is None
  47. def test_refreshAccessTokenSuccess():
  48. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  49. authorization_service.initialize()
  50. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  51. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  52. authorization_service.onAuthStateChanged.emit = MagicMock()
  53. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
  54. authorization_service.refreshAccessToken()
  55. assert authorization_service.onAuthStateChanged.emit.called_with(True)
  56. def test__parseJWTNoRefreshToken():
  57. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  58. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  59. authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
  60. assert authorization_service._parseJWT() is None
  61. def test__parseJWTFailOnRefresh():
  62. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  63. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  64. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  65. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
  66. assert authorization_service._parseJWT() is None
  67. def test__parseJWTSucceedOnRefresh():
  68. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  69. authorization_service.initialize()
  70. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  71. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  72. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE):
  73. with patch.object(AuthorizationHelpers, "parseJWT", MagicMock(return_value = None)) as mocked_parseJWT:
  74. authorization_service._parseJWT()
  75. mocked_parseJWT.assert_called_with("beep")
  76. def test_initialize():
  77. original_preference = MagicMock()
  78. initialize_preferences = MagicMock()
  79. authorization_service = AuthorizationService(OAUTH_SETTINGS, original_preference)
  80. authorization_service.initialize(initialize_preferences)
  81. initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}")
  82. original_preference.addPreference.assert_not_called()
  83. def test_refreshAccessTokenFailed():
  84. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  85. authorization_service.initialize()
  86. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  87. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  88. authorization_service.onAuthStateChanged.emit = MagicMock()
  89. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE):
  90. authorization_service.refreshAccessToken()
  91. assert authorization_service.onAuthStateChanged.emit.called_with(False)
  92. def test_refreshAccesTokenWithoutData():
  93. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  94. authorization_service.initialize()
  95. authorization_service.onAuthStateChanged.emit = MagicMock()
  96. authorization_service.refreshAccessToken()
  97. authorization_service.onAuthStateChanged.emit.assert_not_called()
  98. def test_userProfileException():
  99. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  100. authorization_service.initialize()
  101. authorization_service._parseJWT = MagicMock(side_effect=requests.exceptions.ConnectionError)
  102. assert authorization_service.getUserProfile() is None
  103. def test_failedLogin() -> None:
  104. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  105. authorization_service.onAuthenticationError.emit = MagicMock()
  106. authorization_service.onAuthStateChanged.emit = MagicMock()
  107. authorization_service.initialize()
  108. # Let the service think there was a failed response
  109. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  110. # Check that the error signal was triggered
  111. assert authorization_service.onAuthenticationError.emit.call_count == 1
  112. # Since nothing changed, this should still be 0.
  113. assert authorization_service.onAuthStateChanged.emit.call_count == 0
  114. # Validate that there is no user profile or token
  115. assert authorization_service.getUserProfile() is None
  116. assert authorization_service.getAccessToken() is None
  117. @patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile())
  118. def test_storeAuthData(get_user_profile) -> None:
  119. preferences = Preferences()
  120. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  121. authorization_service.initialize()
  122. # Write stuff to the preferences.
  123. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  124. preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
  125. # Check that something was actually put in the preferences
  126. assert preference_value is not None and preference_value != {}
  127. # Create a second auth service, so we can load the data.
  128. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  129. second_auth_service.initialize()
  130. second_auth_service.loadAuthDataFromPreferences()
  131. assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
  132. @patch.object(LocalAuthorizationServer, "stop")
  133. @patch.object(LocalAuthorizationServer, "start")
  134. @patch.object(webbrowser, "open_new")
  135. def test_localAuthServer(webbrowser_open, start_auth_server, stop_auth_server) -> None:
  136. preferences = Preferences()
  137. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  138. authorization_service.startAuthorizationFlow()
  139. assert webbrowser_open.call_count == 1
  140. # Ensure that the Authorization service tried to start the server.
  141. assert start_auth_server.call_count == 1
  142. assert stop_auth_server.call_count == 0
  143. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  144. # Ensure that it stopped the server.
  145. assert stop_auth_server.call_count == 1
  146. def test_loginAndLogout() -> None:
  147. preferences = Preferences()
  148. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  149. authorization_service.onAuthenticationError.emit = MagicMock()
  150. authorization_service.onAuthStateChanged.emit = MagicMock()
  151. authorization_service.initialize()
  152. # Let the service think there was a successful response
  153. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  154. authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
  155. # Ensure that the error signal was not triggered
  156. assert authorization_service.onAuthenticationError.emit.call_count == 0
  157. # Since we said that it went right this time, validate that we got a signal.
  158. assert authorization_service.onAuthStateChanged.emit.call_count == 1
  159. assert authorization_service.getUserProfile() is not None
  160. assert authorization_service.getAccessToken() == "beep"
  161. # Check that we stored the authentication data, so next time the user won't have to log in again.
  162. assert preferences.getValue("test/auth_data") is not None
  163. # We're logged in now, also check if logging out works
  164. authorization_service.deleteAuthData()
  165. assert authorization_service.onAuthStateChanged.emit.call_count == 2
  166. assert authorization_service.getUserProfile() is None
  167. # Ensure the data is gone after we logged out.
  168. assert preferences.getValue("test/auth_data") == "{}"
  169. def test_wrongServerResponses() -> None:
  170. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  171. authorization_service.initialize()
  172. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  173. authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
  174. assert authorization_service.getUserProfile() is None