123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- # Copyright (c) 2021 Ultimaker B.V.
- # Cura is released under the terms of the LGPLv3 or higher.
- from datetime import datetime
- from unittest.mock import MagicMock, Mock, patch
- from pytest import fixture
- from PyQt6.QtGui import QDesktopServices
- from PyQt6.QtNetwork import QNetworkReply
- from UM.Preferences import Preferences
- from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
- from cura.OAuth2.AuthorizationService import AuthorizationService, MYCLOUD_LOGOFF_URL
- from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
- from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
- CALLBACK_PORT = 32118
- OAUTH_ROOT = "https://account.ultimaker.com"
- CLOUD_API_ROOT = "https://api.ultimaker.com"
- OAUTH_SETTINGS = OAuth2Settings(
- OAUTH_SERVER_URL= OAUTH_ROOT,
- CALLBACK_PORT=CALLBACK_PORT,
- CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
- CLIENT_ID="",
- CLIENT_SCOPES="",
- AUTH_DATA_PREFERENCE_KEY="test/auth_data",
- AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
- AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
- )
- FAILED_AUTH_RESPONSE = AuthenticationResponse(
- success = False,
- err_message = "FAILURE!"
- )
- SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
- access_token = "beep",
- refresh_token = "beep?",
- received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
- expires_in = 300, # 5 minutes should be more than enough for testing
- success = True
- )
- EXPIRED_AUTH_RESPONSE = AuthenticationResponse(
- access_token = "expired",
- refresh_token = "beep?",
- received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
- expires_in = 300, # 5 minutes should be more than enough for testing
- success = True
- )
- NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse(
- access_token = "beep",
- received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
- expires_in = 300, # 5 minutes should be more than enough for testing
- success = True
- )
- MALFORMED_AUTH_RESPONSE = AuthenticationResponse(success=False)
- @fixture
- def http_request_manager():
- mock_reply = Mock() # The user profile that the service should respond with.
- mock_reply.error = Mock(return_value=QNetworkReply.NetworkError.NoError)
- http_mock = Mock()
- http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
- http_mock.readJSON = Mock(return_value={"data": {"user_id": "id_ego_or_superego", "username": "Ghostkeeper"}})
- http_mock.setDelayRequests = Mock()
- return http_mock
- def test_cleanAuthService() -> None:
- """
- Ensure that when setting up an AuthorizationService, no data is set.
- """
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.initialize()
- mock_callback = Mock()
- authorization_service.getUserProfile(mock_callback)
- mock_callback.assert_called_once_with(None)
- assert authorization_service.getAccessToken() is None
- def test_refreshAccessTokenSuccess(http_request_manager):
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.initialize()
- with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
- authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
- authorization_service.onAuthStateChanged.emit = MagicMock()
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value=http_request_manager)):
- with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
- with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", side_effect=lambda refresh_token, callback: callback(SUCCESSFUL_AUTH_RESPONSE)):
- authorization_service.refreshAccessToken()
- authorization_service.onAuthStateChanged.emit.assert_called_once_with(logged_in = True)
- def test__parseJWTNoRefreshToken(http_request_manager):
- """
- Tests parsing the user profile if there is no refresh token stored, but there is a normal authentication token.
- The request for the user profile using the authentication token should still work normally.
- """
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
- authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
- mock_callback = Mock() # To log the final profile response.
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_request_manager)):
- authorization_service._parseJWT(mock_callback)
- mock_callback.assert_called_once()
- profile_reply = mock_callback.call_args_list[0][0][0]
- assert profile_reply.user_id == "id_ego_or_superego"
- assert profile_reply.username == "Ghostkeeper"
- def test__parseJWTFailOnRefresh():
- """
- Tries to refresh the authentication token using an invalid refresh token. The request should fail.
- """
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
- authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
- mock_callback = Mock() # To log the final profile response.
- mock_reply = Mock() # The response that the request should give, containing an error about it failing to authenticate.
- mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError) # The reply is 403: Authentication required, meaning the server responded with a "Can't do that, Dave".
- http_mock = Mock()
- http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
- http_mock.post = lambda url, data, headers_dict, callback, error_callback, urgent, timeout: callback(mock_reply)
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.readJSON", Mock(return_value = {"error_description": "Mock a failed request!"})):
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
- authorization_service._parseJWT(mock_callback)
- mock_callback.assert_called_once_with(None)
- def test__parseJWTSucceedOnRefresh():
- """
- Tries to refresh the authentication token using a valid refresh token. The request should succeed.
- """
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.initialize()
- with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
- authorization_service._storeAuthData(EXPIRED_AUTH_RESPONSE)
- mock_callback = Mock() # To log the final profile response.
- mock_reply_success = Mock() # The reply should be a failure when using the expired access token, but succeed when using the refresh token.
- mock_reply_success.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
- mock_reply_failure = Mock()
- mock_reply_failure.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError)
- http_mock = Mock()
- def mock_get(url, headers_dict, callback, error_callback, timeout):
- if(headers_dict == {"Authorization": "Bearer beep"}):
- callback(mock_reply_success)
- else:
- callback(mock_reply_failure)
- http_mock.get = mock_get
- http_mock.readJSON = Mock(return_value = {"data": {"user_id": "user_idea", "username": "Ghostkeeper"}})
- def mock_refresh(self, refresh_token, callback): # Refreshing gives a valid token.
- callback(SUCCESSFUL_AUTH_RESPONSE)
- with patch("cura.OAuth2.AuthorizationHelpers.AuthorizationHelpers.getAccessTokenUsingRefreshToken", mock_refresh):
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
- authorization_service._parseJWT(mock_callback)
- mock_callback.assert_called_once()
- profile_reply = mock_callback.call_args_list[0][0][0]
- assert profile_reply.user_id == "user_idea"
- assert profile_reply.username == "Ghostkeeper"
- def test_initialize():
- original_preference = MagicMock()
- initialize_preferences = MagicMock()
- authorization_service = AuthorizationService(OAUTH_SETTINGS, original_preference)
- authorization_service.initialize(initialize_preferences)
- initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}")
- original_preference.addPreference.assert_not_called()
- def test_refreshAccessTokenFailed():
- """
- Test if the authentication is reset once the refresh token fails to refresh access.
- """
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.initialize()
- with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
- authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
- authorization_service.onAuthStateChanged.emit = MagicMock()
- mock_reply = Mock() # The response that the request should give, containing an error about it failing to authenticate.
- mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError) # The reply is 403: Authentication required, meaning the server responded with a "Can't do that, Dave".
- http_mock = Mock()
- http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
- http_mock.post = lambda url, data, headers_dict, callback, error_callback, urgent, timeout: callback(mock_reply)
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.readJSON", Mock(return_value = {"error_description": "Mock a failed request!"})):
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
- authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
- with patch("cura.OAuth2.AuthorizationHelpers.AuthorizationHelpers.getAccessTokenUsingRefreshToken", side_effect=lambda refresh_token, callback: callback(FAILED_AUTH_RESPONSE)):
- authorization_service.refreshAccessToken()
- authorization_service.onAuthStateChanged.emit.assert_called_with(logged_in = False)
- def test_refreshAccesTokenWithoutData():
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.initialize()
- authorization_service.onAuthStateChanged.emit = MagicMock()
- authorization_service.refreshAccessToken()
- authorization_service.onAuthStateChanged.emit.assert_not_called()
- def test_failedLogin() -> None:
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.onAuthenticationError.emit = MagicMock()
- authorization_service.onAuthStateChanged.emit = MagicMock()
- authorization_service.initialize()
- # Let the service think there was a failed response
- authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
- # Check that the error signal was triggered
- assert authorization_service.onAuthenticationError.emit.call_count == 1
- # Since nothing changed, this should still be 0.
- assert authorization_service.onAuthStateChanged.emit.call_count == 0
- # Validate that there is no user profile or token
- assert authorization_service.getUserProfile() is None
- assert authorization_service.getAccessToken() is None
- @patch.object(AuthorizationService, "getUserProfile")
- def test_storeAuthData(get_user_profile) -> None:
- preferences = Preferences()
- authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
- authorization_service.initialize()
- # Write stuff to the preferences.
- authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
- preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
- # Check that something was actually put in the preferences
- assert preference_value is not None and preference_value != {}
- # Create a second auth service, so we can load the data.
- second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
- second_auth_service.initialize()
- second_auth_service.loadAuthDataFromPreferences()
- assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
- @patch.object(LocalAuthorizationServer, "stop")
- @patch.object(LocalAuthorizationServer, "start")
- @patch.object(QDesktopServices, "openUrl")
- def test_localAuthServer(QDesktopServices_openUrl, start_auth_server, stop_auth_server) -> None:
- preferences = Preferences()
- authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
- authorization_service.startAuthorizationFlow()
- assert QDesktopServices_openUrl.call_count == 1
- # Ensure that the Authorization service tried to start the server.
- assert start_auth_server.call_count == 1
- assert stop_auth_server.call_count == 0
- authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
- # Ensure that it stopped the server.
- assert stop_auth_server.call_count == 1
- def test_loginAndLogout() -> None:
- preferences = Preferences()
- authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
- authorization_service.onAuthenticationError.emit = MagicMock()
- authorization_service.onAuthStateChanged.emit = MagicMock()
- authorization_service.initialize()
- mock_reply = Mock() # The user profile that the service should respond with.
- mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
- http_mock = Mock()
- http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
- http_mock.readJSON = Mock(return_value = {"data": {"user_id": "di_resu", "username": "Emanresu"}})
- # Let the service think there was a successful response
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
- authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
- # Ensure that the error signal was not triggered
- assert authorization_service.onAuthenticationError.emit.call_count == 0
- # Since we said that it went right this time, validate that we got a signal.
- assert authorization_service.onAuthStateChanged.emit.call_count == 1
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
- def callback(profile):
- assert profile is not None
- authorization_service.getUserProfile(callback)
- assert authorization_service.getAccessToken() == "beep"
- # Check that we stored the authentication data, so next time the user won't have to log in again.
- assert preferences.getValue("test/auth_data") is not None
- # We're logged in now, also check if logging out works
- authorization_service.deleteAuthData()
- assert authorization_service.onAuthStateChanged.emit.call_count == 2
- with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
- def callback(profile):
- assert profile is None
- authorization_service.getUserProfile(callback)
- # Ensure the data is gone after we logged out.
- assert preferences.getValue("test/auth_data") == "{}"
- def test_wrongServerResponses() -> None:
- authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
- authorization_service.initialize()
- authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
- def callback(profile):
- assert profile is None
- authorization_service.getUserProfile(callback)
- def test__generate_auth_url() -> None:
- preferences = Preferences()
- authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
- query_parameters_dict = {
- "client_id": "",
- "redirect_uri": OAUTH_SETTINGS.CALLBACK_URL,
- "scope": OAUTH_SETTINGS.CLIENT_SCOPES,
- "response_type": "code"
- }
- auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = False)
- assert MYCLOUD_LOGOFF_URL + "&next=" not in auth_url
- auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = True)
- assert MYCLOUD_LOGOFF_URL + "&next=" in auth_url
|