TestOAuth2.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import webbrowser
  2. from datetime import datetime
  3. from unittest.mock import MagicMock, patch
  4. from UM.Preferences import Preferences
  5. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
  6. from cura.OAuth2.AuthorizationService import AuthorizationService
  7. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  8. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  9. CALLBACK_PORT = 32118
  10. OAUTH_ROOT = "https://account.ultimaker.com"
  11. CLOUD_API_ROOT = "https://api.ultimaker.com"
  12. OAUTH_SETTINGS = OAuth2Settings(
  13. OAUTH_SERVER_URL= OAUTH_ROOT,
  14. CALLBACK_PORT=CALLBACK_PORT,
  15. CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
  16. CLIENT_ID="",
  17. CLIENT_SCOPES="",
  18. AUTH_DATA_PREFERENCE_KEY="test/auth_data",
  19. AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
  20. AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
  21. )
  22. FAILED_AUTH_RESPONSE = AuthenticationResponse(
  23. success = False,
  24. err_message = "FAILURE!"
  25. )
  26. SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
  27. access_token = "beep",
  28. refresh_token = "beep?",
  29. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  30. expires_in = 300 # 5 minutes should be more than enough for testing
  31. )
  32. MALFORMED_AUTH_RESPONSE = AuthenticationResponse()
  33. def test_cleanAuthService() -> None:
  34. # Ensure that when setting up an AuthorizationService, no data is set.
  35. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  36. authorization_service.initialize()
  37. assert authorization_service.getUserProfile() is None
  38. assert authorization_service.getAccessToken() is None
  39. def test_failedLogin() -> None:
  40. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  41. authorization_service.onAuthenticationError.emit = MagicMock()
  42. authorization_service.onAuthStateChanged.emit = MagicMock()
  43. authorization_service.initialize()
  44. # Let the service think there was a failed response
  45. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  46. # Check that the error signal was triggered
  47. assert authorization_service.onAuthenticationError.emit.call_count == 1
  48. # Since nothing changed, this should still be 0.
  49. assert authorization_service.onAuthStateChanged.emit.call_count == 0
  50. # Validate that there is no user profile or token
  51. assert authorization_service.getUserProfile() is None
  52. assert authorization_service.getAccessToken() is None
  53. @patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile())
  54. def test_storeAuthData(get_user_profile) -> None:
  55. preferences = Preferences()
  56. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  57. authorization_service.initialize()
  58. # Write stuff to the preferences.
  59. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  60. preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
  61. # Check that something was actually put in the preferences
  62. assert preference_value is not None and preference_value != {}
  63. # Create a second auth service, so we can load the data.
  64. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  65. second_auth_service.initialize()
  66. second_auth_service.loadAuthDataFromPreferences()
  67. assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
  68. @patch.object(LocalAuthorizationServer, "stop")
  69. @patch.object(LocalAuthorizationServer, "start")
  70. @patch.object(webbrowser, "open_new")
  71. def test_localAuthServer(webbrowser_open, start_auth_server, stop_auth_server) -> None:
  72. preferences = Preferences()
  73. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  74. authorization_service.startAuthorizationFlow()
  75. assert webbrowser_open.call_count == 1
  76. # Ensure that the Authorization service tried to start the server.
  77. assert start_auth_server.call_count == 1
  78. assert stop_auth_server.call_count == 0
  79. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  80. # Ensure that it stopped the server.
  81. assert stop_auth_server.call_count == 1
  82. def test_loginAndLogout() -> None:
  83. preferences = Preferences()
  84. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  85. authorization_service.onAuthenticationError.emit = MagicMock()
  86. authorization_service.onAuthStateChanged.emit = MagicMock()
  87. authorization_service.initialize()
  88. # Let the service think there was a successful response
  89. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  90. authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
  91. # Ensure that the error signal was not triggered
  92. assert authorization_service.onAuthenticationError.emit.call_count == 0
  93. # Since we said that it went right this time, validate that we got a signal.
  94. assert authorization_service.onAuthStateChanged.emit.call_count == 1
  95. assert authorization_service.getUserProfile() is not None
  96. assert authorization_service.getAccessToken() == "beep"
  97. # Check that we stored the authentication data, so next time the user won't have to log in again.
  98. assert preferences.getValue("test/auth_data") is not None
  99. # We're logged in now, also check if logging out works
  100. authorization_service.deleteAuthData()
  101. assert authorization_service.onAuthStateChanged.emit.call_count == 2
  102. assert authorization_service.getUserProfile() is None
  103. # Ensure the data is gone after we logged out.
  104. assert preferences.getValue("test/auth_data") == "{}"
  105. def test_wrongServerResponses() -> None:
  106. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  107. authorization_service.initialize()
  108. with patch.object(AuthorizationHelpers, "parseJWT", return_value=UserProfile()):
  109. authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
  110. assert authorization_service.getUserProfile() is None