from datetime import datetime from unittest.mock import MagicMock, patch import requests from PyQt5.QtGui import QDesktopServices from UM.Preferences import Preferences from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT from cura.OAuth2.AuthorizationService import AuthorizationService, MYCLOUD_LOGOFF_URL from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile CALLBACK_PORT = 32118 OAUTH_ROOT = "https://account.ultimaker.com" CLOUD_API_ROOT = "https://api.ultimaker.com" OAUTH_SETTINGS = OAuth2Settings( OAUTH_SERVER_URL= OAUTH_ROOT, CALLBACK_PORT=CALLBACK_PORT, CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT), CLIENT_ID="", CLIENT_SCOPES="", AUTH_DATA_PREFERENCE_KEY="test/auth_data", AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT), AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT) ) FAILED_AUTH_RESPONSE = AuthenticationResponse( success = False, err_message = "FAILURE!" ) SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse( access_token = "beep", refresh_token = "beep?", received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT), expires_in = 300, # 5 minutes should be more than enough for testing success = True ) NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse( access_token = "beep", received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT), expires_in = 300, # 5 minutes should be more than enough for testing success = True ) MALFORMED_AUTH_RESPONSE = AuthenticationResponse() def test_cleanAuthService() -> None: # Ensure that when setting up an AuthorizationService, no data is set. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() assert authorization_service.getUserProfile() is None assert authorization_service.getAccessToken() is None def test_refreshAccessTokenSuccess(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()): authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE) authorization_service.onAuthStateChanged.emit = MagicMock() with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE): authorization_service.refreshAccessToken() assert authorization_service.onAuthStateChanged.emit.called_with(True) def test__parseJWTNoRefreshToken(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()): authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE) assert authorization_service._parseJWT() is None def test__parseJWTFailOnRefresh(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()): authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE) with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE): assert authorization_service._parseJWT() is None def test__parseJWTSucceedOnRefresh(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()): authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE) with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=SUCCESSFUL_AUTH_RESPONSE): with patch.object(AuthorizationHelpers, "parseJWT", MagicMock(return_value = None)) as mocked_parseJWT: authorization_service._parseJWT() mocked_parseJWT.assert_called_with("beep") def test_initialize(): original_preference = MagicMock() initialize_preferences = MagicMock() authorization_service = AuthorizationService(OAUTH_SETTINGS, original_preference) authorization_service.initialize(initialize_preferences) initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}") original_preference.addPreference.assert_not_called() def test_refreshAccessTokenFailed(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()): authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE) authorization_service.onAuthStateChanged.emit = MagicMock() with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", return_value=FAILED_AUTH_RESPONSE): authorization_service.refreshAccessToken() assert authorization_service.onAuthStateChanged.emit.called_with(False) def test_refreshAccesTokenWithoutData(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() authorization_service.onAuthStateChanged.emit = MagicMock() authorization_service.refreshAccessToken() authorization_service.onAuthStateChanged.emit.assert_not_called() def test_userProfileException(): authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() authorization_service._parseJWT = MagicMock(side_effect=requests.exceptions.ConnectionError) assert authorization_service.getUserProfile() is None def test_failedLogin() -> None: authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.onAuthenticationError.emit = MagicMock() authorization_service.onAuthStateChanged.emit = MagicMock() authorization_service.initialize() # Let the service think there was a failed response authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE) # Check that the error signal was triggered assert authorization_service.onAuthenticationError.emit.call_count == 1 # Since nothing changed, this should still be 0. assert authorization_service.onAuthStateChanged.emit.call_count == 0 # Validate that there is no user profile or token assert authorization_service.getUserProfile() is None assert authorization_service.getAccessToken() is None @patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()) def test_storeAuthData(get_user_profile) -> None: preferences = Preferences() authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences) authorization_service.initialize() # Write stuff to the preferences. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE) preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY) # Check that something was actually put in the preferences assert preference_value is not None and preference_value != {} # Create a second auth service, so we can load the data. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences) second_auth_service.initialize() second_auth_service.loadAuthDataFromPreferences() assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token @patch.object(LocalAuthorizationServer, "stop") @patch.object(LocalAuthorizationServer, "start") @patch.object(QDesktopServices, "openUrl") def test_localAuthServer(QDesktopServices_openUrl, start_auth_server, stop_auth_server) -> None: preferences = Preferences() authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences) authorization_service.startAuthorizationFlow() assert QDesktopServices_openUrl.call_count == 1 # Ensure that the Authorization service tried to start the server. assert start_auth_server.call_count == 1 assert stop_auth_server.call_count == 0 authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE) # Ensure that it stopped the server. assert stop_auth_server.call_count == 1 def test_loginAndLogout() -> None: preferences = Preferences() authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences) authorization_service.onAuthenticationError.emit = MagicMock() authorization_service.onAuthStateChanged.emit = MagicMock() authorization_service.initialize() # Let the service think there was a successful response with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()): authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE) # Ensure that the error signal was not triggered assert authorization_service.onAuthenticationError.emit.call_count == 0 # Since we said that it went right this time, validate that we got a signal. assert authorization_service.onAuthStateChanged.emit.call_count == 1 assert authorization_service.getUserProfile() is not None assert authorization_service.getAccessToken() == "beep" # Check that we stored the authentication data, so next time the user won't have to log in again. assert preferences.getValue("test/auth_data") is not None # We're logged in now, also check if logging out works authorization_service.deleteAuthData() assert authorization_service.onAuthStateChanged.emit.call_count == 2 assert authorization_service.getUserProfile() is None # Ensure the data is gone after we logged out. assert preferences.getValue("test/auth_data") == "{}" def test_wrongServerResponses() -> None: authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences()) authorization_service.initialize() with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()): authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE) assert authorization_service.getUserProfile() is None def test__generate_auth_url() -> None: preferences = Preferences() authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences) query_parameters_dict = { "client_id": "", "redirect_uri": OAUTH_SETTINGS.CALLBACK_URL, "scope": OAUTH_SETTINGS.CLIENT_SCOPES, "response_type": "code" } auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = False) assert MYCLOUD_LOGOFF_URL + "?next=" not in auth_url auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = True) assert MYCLOUD_LOGOFF_URL + "?next=" in auth_url