TestOAuth2.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import webbrowser
  2. from unittest.mock import MagicMock, patch
  3. from UM.Preferences import Preferences
  4. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers
  5. from cura.OAuth2.AuthorizationService import AuthorizationService
  6. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  7. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  8. CALLBACK_PORT = 32118
  9. OAUTH_ROOT = "https://account.ultimaker.com"
  10. CLOUD_API_ROOT = "https://api.ultimaker.com"
  11. OAUTH_SETTINGS = OAuth2Settings(
  12. OAUTH_SERVER_URL= OAUTH_ROOT,
  13. CALLBACK_PORT=CALLBACK_PORT,
  14. CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
  15. CLIENT_ID="",
  16. CLIENT_SCOPES="",
  17. AUTH_DATA_PREFERENCE_KEY="test/auth_data",
  18. AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
  19. AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
  20. )
  21. FAILED_AUTH_RESPONSE = AuthenticationResponse(success = False, err_message = "FAILURE!")
  22. SUCCESFULL_AUTH_RESPONSE = AuthenticationResponse(access_token = "beep", refresh_token = "beep?")
  23. MALFORMED_AUTH_RESPONSE = AuthenticationResponse()
  24. def test_cleanAuthService() -> None:
  25. # Ensure that when setting up an AuthorizationService, no data is set.
  26. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  27. authorization_service.initialize()
  28. assert authorization_service.getUserProfile() is None
  29. assert authorization_service.getAccessToken() is None
  30. def test_failedLogin() -> None:
  31. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  32. authorization_service.onAuthenticationError.emit = MagicMock()
  33. authorization_service.onAuthStateChanged.emit = MagicMock()
  34. authorization_service.initialize()
  35. # Let the service think there was a failed response
  36. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  37. # Check that the error signal was triggered
  38. assert authorization_service.onAuthenticationError.emit.call_count == 1
  39. # Since nothing changed, this should still be 0.
  40. assert authorization_service.onAuthStateChanged.emit.call_count == 0
  41. # Validate that there is no user profile or token
  42. assert authorization_service.getUserProfile() is None
  43. assert authorization_service.getAccessToken() is None
  44. @patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile())
  45. def test_storeAuthData(get_user_profile) -> None:
  46. preferences = Preferences()
  47. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  48. authorization_service.initialize()
  49. # Write stuff to the preferences.
  50. authorization_service._storeAuthData(SUCCESFULL_AUTH_RESPONSE)
  51. preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
  52. # Check that something was actually put in the preferences
  53. assert preference_value is not None and preference_value != {}
  54. # Create a second auth service, so we can load the data.
  55. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  56. second_auth_service.initialize()
  57. second_auth_service.loadAuthDataFromPreferences()
  58. assert second_auth_service.getAccessToken() == SUCCESFULL_AUTH_RESPONSE.access_token
  59. @patch.object(LocalAuthorizationServer, "stop")
  60. @patch.object(LocalAuthorizationServer, "start")
  61. @patch.object(webbrowser, "open_new")
  62. def test_localAuthServer(webbrowser_open, start_auth_server, stop_auth_server) -> None:
  63. preferences = Preferences()
  64. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  65. authorization_service.startAuthorizationFlow()
  66. assert webbrowser_open.call_count == 1
  67. # Ensure that the Authorization service tried to start the server.
  68. assert start_auth_server.call_count == 1
  69. assert stop_auth_server.call_count == 0
  70. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  71. # Ensure that it stopped the server.
  72. assert stop_auth_server.call_count == 1
  73. def test_loginAndLogout() -> None:
  74. preferences = Preferences()
  75. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  76. authorization_service.onAuthenticationError.emit = MagicMock()
  77. authorization_service.onAuthStateChanged.emit = MagicMock()
  78. authorization_service.initialize()
  79. # Let the service think there was a succesfull response
  80. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  81. authorization_service._onAuthStateChanged(SUCCESFULL_AUTH_RESPONSE)
  82. # Ensure that the error signal was not triggered
  83. assert authorization_service.onAuthenticationError.emit.call_count == 0
  84. # Since we said that it went right this time, validate that we got a signal.
  85. assert authorization_service.onAuthStateChanged.emit.call_count == 1
  86. assert authorization_service.getUserProfile() is not None
  87. assert authorization_service.getAccessToken() == "beep"
  88. # Check that we stored the authentication data, so next time the user won't have to log in again.
  89. assert preferences.getValue("test/auth_data") is not None
  90. # We're logged in now, also check if logging out works
  91. authorization_service.deleteAuthData()
  92. assert authorization_service.onAuthStateChanged.emit.call_count == 2
  93. assert authorization_service.getUserProfile() is None
  94. # Ensure the data is gone after we logged out.
  95. assert preferences.getValue("test/auth_data") == "{}"
  96. def test_wrongServerResponses() -> None:
  97. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  98. authorization_service.initialize()
  99. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  100. authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
  101. assert authorization_service.getUserProfile() is None