TestOAuth2.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. # Copyright (c) 2021 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. from datetime import datetime
  4. from unittest.mock import MagicMock, Mock, patch
  5. from pytest import fixture
  6. from PyQt6.QtGui import QDesktopServices
  7. from PyQt6.QtNetwork import QNetworkReply
  8. from UM.Preferences import Preferences
  9. from cura.OAuth2.AuthorizationHelpers import AuthorizationHelpers, TOKEN_TIMESTAMP_FORMAT
  10. from cura.OAuth2.AuthorizationService import AuthorizationService, MYCLOUD_LOGOFF_URL
  11. from cura.OAuth2.LocalAuthorizationServer import LocalAuthorizationServer
  12. from cura.OAuth2.Models import OAuth2Settings, AuthenticationResponse, UserProfile
  13. CALLBACK_PORT = 32118
  14. OAUTH_ROOT = "https://account.ultimaker.com"
  15. CLOUD_API_ROOT = "https://api.ultimaker.com"
  16. OAUTH_SETTINGS = OAuth2Settings(
  17. OAUTH_SERVER_URL= OAUTH_ROOT,
  18. CALLBACK_PORT=CALLBACK_PORT,
  19. CALLBACK_URL="http://localhost:{}/callback".format(CALLBACK_PORT),
  20. CLIENT_ID="",
  21. CLIENT_SCOPES="",
  22. AUTH_DATA_PREFERENCE_KEY="test/auth_data",
  23. AUTH_SUCCESS_REDIRECT="{}/app/auth-success".format(OAUTH_ROOT),
  24. AUTH_FAILED_REDIRECT="{}/app/auth-error".format(OAUTH_ROOT)
  25. )
  26. FAILED_AUTH_RESPONSE = AuthenticationResponse(
  27. success = False,
  28. err_message = "FAILURE!"
  29. )
  30. SUCCESSFUL_AUTH_RESPONSE = AuthenticationResponse(
  31. access_token = "beep",
  32. refresh_token = "beep?",
  33. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  34. expires_in = 300, # 5 minutes should be more than enough for testing
  35. success = True
  36. )
  37. EXPIRED_AUTH_RESPONSE = AuthenticationResponse(
  38. access_token = "expired",
  39. refresh_token = "beep?",
  40. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  41. expires_in = 300, # 5 minutes should be more than enough for testing
  42. success = True
  43. )
  44. NO_REFRESH_AUTH_RESPONSE = AuthenticationResponse(
  45. access_token = "beep",
  46. received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT),
  47. expires_in = 300, # 5 minutes should be more than enough for testing
  48. success = True
  49. )
  50. MALFORMED_AUTH_RESPONSE = AuthenticationResponse(success=False)
  51. @fixture
  52. def http_request_manager():
  53. mock_reply = Mock() # The user profile that the service should respond with.
  54. mock_reply.error = Mock(return_value=QNetworkReply.NetworkError.NoError)
  55. http_mock = Mock()
  56. http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
  57. http_mock.readJSON = Mock(return_value={"data": {"user_id": "id_ego_or_superego", "username": "Ghostkeeper"}})
  58. http_mock.setDelayRequests = Mock()
  59. return http_mock
  60. def test_cleanAuthService() -> None:
  61. """
  62. Ensure that when setting up an AuthorizationService, no data is set.
  63. """
  64. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  65. authorization_service.initialize()
  66. mock_callback = Mock()
  67. authorization_service.getUserProfile(mock_callback)
  68. mock_callback.assert_called_once_with(None)
  69. assert authorization_service.getAccessToken() is None
  70. def test_refreshAccessTokenSuccess(http_request_manager):
  71. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  72. authorization_service.initialize()
  73. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  74. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  75. authorization_service.onAuthStateChanged.emit = MagicMock()
  76. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value=http_request_manager)):
  77. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  78. with patch.object(AuthorizationHelpers, "getAccessTokenUsingRefreshToken", side_effect=lambda refresh_token, callback: callback(SUCCESSFUL_AUTH_RESPONSE)):
  79. authorization_service.refreshAccessToken()
  80. authorization_service.onAuthStateChanged.emit.assert_called_once_with(logged_in = True)
  81. def test__parseJWTNoRefreshToken(http_request_manager):
  82. """
  83. Tests parsing the user profile if there is no refresh token stored, but there is a normal authentication token.
  84. The request for the user profile using the authentication token should still work normally.
  85. """
  86. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  87. with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
  88. authorization_service._storeAuthData(NO_REFRESH_AUTH_RESPONSE)
  89. mock_callback = Mock() # To log the final profile response.
  90. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_request_manager)):
  91. authorization_service._parseJWT(mock_callback)
  92. mock_callback.assert_called_once()
  93. profile_reply = mock_callback.call_args_list[0][0][0]
  94. assert profile_reply.user_id == "id_ego_or_superego"
  95. assert profile_reply.username == "Ghostkeeper"
  96. def test__parseJWTFailOnRefresh():
  97. """
  98. Tries to refresh the authentication token using an invalid refresh token. The request should fail.
  99. """
  100. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  101. with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
  102. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  103. mock_callback = Mock() # To log the final profile response.
  104. mock_reply = Mock() # The response that the request should give, containing an error about it failing to authenticate.
  105. mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError) # The reply is 403: Authentication required, meaning the server responded with a "Can't do that, Dave".
  106. http_mock = Mock()
  107. http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
  108. http_mock.post = lambda url, data, headers_dict, callback, error_callback, urgent, timeout: callback(mock_reply)
  109. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.readJSON", Mock(return_value = {"error_description": "Mock a failed request!"})):
  110. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
  111. authorization_service._parseJWT(mock_callback)
  112. mock_callback.assert_called_once_with(None)
  113. def test__parseJWTSucceedOnRefresh():
  114. """
  115. Tries to refresh the authentication token using a valid refresh token. The request should succeed.
  116. """
  117. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  118. authorization_service.initialize()
  119. with patch.object(AuthorizationService, "getUserProfile", return_value = UserProfile()):
  120. authorization_service._storeAuthData(EXPIRED_AUTH_RESPONSE)
  121. mock_callback = Mock() # To log the final profile response.
  122. mock_reply_success = Mock() # The reply should be a failure when using the expired access token, but succeed when using the refresh token.
  123. mock_reply_success.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
  124. mock_reply_failure = Mock()
  125. mock_reply_failure.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError)
  126. http_mock = Mock()
  127. def mock_get(url, headers_dict, callback, error_callback, timeout):
  128. if(headers_dict == {"Authorization": "Bearer beep"}):
  129. callback(mock_reply_success)
  130. else:
  131. callback(mock_reply_failure)
  132. http_mock.get = mock_get
  133. http_mock.readJSON = Mock(return_value = {"data": {"user_id": "user_idea", "username": "Ghostkeeper"}})
  134. def mock_refresh(self, refresh_token, callback): # Refreshing gives a valid token.
  135. callback(SUCCESSFUL_AUTH_RESPONSE)
  136. with patch("cura.OAuth2.AuthorizationHelpers.AuthorizationHelpers.getAccessTokenUsingRefreshToken", mock_refresh):
  137. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
  138. authorization_service._parseJWT(mock_callback)
  139. mock_callback.assert_called_once()
  140. profile_reply = mock_callback.call_args_list[0][0][0]
  141. assert profile_reply.user_id == "user_idea"
  142. assert profile_reply.username == "Ghostkeeper"
  143. def test_initialize():
  144. original_preference = MagicMock()
  145. initialize_preferences = MagicMock()
  146. authorization_service = AuthorizationService(OAUTH_SETTINGS, original_preference)
  147. authorization_service.initialize(initialize_preferences)
  148. initialize_preferences.addPreference.assert_called_once_with("test/auth_data", "{}")
  149. original_preference.addPreference.assert_not_called()
  150. def test_refreshAccessTokenFailed():
  151. """
  152. Test if the authentication is reset once the refresh token fails to refresh access.
  153. """
  154. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  155. authorization_service.initialize()
  156. with patch.object(AuthorizationService, "getUserProfile", return_value=UserProfile()):
  157. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  158. authorization_service.onAuthStateChanged.emit = MagicMock()
  159. mock_reply = Mock() # The response that the request should give, containing an error about it failing to authenticate.
  160. mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.AuthenticationRequiredError) # The reply is 403: Authentication required, meaning the server responded with a "Can't do that, Dave".
  161. http_mock = Mock()
  162. http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
  163. http_mock.post = lambda url, data, headers_dict, callback, error_callback, urgent, timeout: callback(mock_reply)
  164. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.readJSON", Mock(return_value = {"error_description": "Mock a failed request!"})):
  165. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
  166. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  167. with patch("cura.OAuth2.AuthorizationHelpers.AuthorizationHelpers.getAccessTokenUsingRefreshToken", side_effect=lambda refresh_token, callback: callback(FAILED_AUTH_RESPONSE)):
  168. authorization_service.refreshAccessToken()
  169. authorization_service.onAuthStateChanged.emit.assert_called_with(logged_in = False)
  170. def test_refreshAccesTokenWithoutData():
  171. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  172. authorization_service.initialize()
  173. authorization_service.onAuthStateChanged.emit = MagicMock()
  174. authorization_service.refreshAccessToken()
  175. authorization_service.onAuthStateChanged.emit.assert_not_called()
  176. def test_failedLogin() -> None:
  177. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  178. authorization_service.onAuthenticationError.emit = MagicMock()
  179. authorization_service.onAuthStateChanged.emit = MagicMock()
  180. authorization_service.initialize()
  181. # Let the service think there was a failed response
  182. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  183. # Check that the error signal was triggered
  184. assert authorization_service.onAuthenticationError.emit.call_count == 1
  185. # Since nothing changed, this should still be 0.
  186. assert authorization_service.onAuthStateChanged.emit.call_count == 0
  187. # Validate that there is no user profile or token
  188. assert authorization_service.getUserProfile() is None
  189. assert authorization_service.getAccessToken() is None
  190. @patch.object(AuthorizationService, "getUserProfile")
  191. def test_storeAuthData(get_user_profile) -> None:
  192. preferences = Preferences()
  193. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  194. authorization_service.initialize()
  195. # Write stuff to the preferences.
  196. authorization_service._storeAuthData(SUCCESSFUL_AUTH_RESPONSE)
  197. preference_value = preferences.getValue(OAUTH_SETTINGS.AUTH_DATA_PREFERENCE_KEY)
  198. # Check that something was actually put in the preferences
  199. assert preference_value is not None and preference_value != {}
  200. # Create a second auth service, so we can load the data.
  201. second_auth_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  202. second_auth_service.initialize()
  203. second_auth_service.loadAuthDataFromPreferences()
  204. assert second_auth_service.getAccessToken() == SUCCESSFUL_AUTH_RESPONSE.access_token
  205. @patch.object(LocalAuthorizationServer, "stop")
  206. @patch.object(LocalAuthorizationServer, "start")
  207. @patch.object(QDesktopServices, "openUrl")
  208. def test_localAuthServer(QDesktopServices_openUrl, start_auth_server, stop_auth_server) -> None:
  209. preferences = Preferences()
  210. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  211. authorization_service.startAuthorizationFlow()
  212. assert QDesktopServices_openUrl.call_count == 1
  213. # Ensure that the Authorization service tried to start the server.
  214. assert start_auth_server.call_count == 1
  215. assert stop_auth_server.call_count == 0
  216. authorization_service._onAuthStateChanged(FAILED_AUTH_RESPONSE)
  217. # Ensure that it stopped the server.
  218. assert stop_auth_server.call_count == 1
  219. def test_loginAndLogout() -> None:
  220. preferences = Preferences()
  221. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  222. authorization_service.onAuthenticationError.emit = MagicMock()
  223. authorization_service.onAuthStateChanged.emit = MagicMock()
  224. authorization_service.initialize()
  225. mock_reply = Mock() # The user profile that the service should respond with.
  226. mock_reply.error = Mock(return_value = QNetworkReply.NetworkError.NoError)
  227. http_mock = Mock()
  228. http_mock.get = lambda url, headers_dict, callback, error_callback, timeout: callback(mock_reply)
  229. http_mock.readJSON = Mock(return_value = {"data": {"user_id": "di_resu", "username": "Emanresu"}})
  230. # Let the service think there was a successful response
  231. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
  232. authorization_service._onAuthStateChanged(SUCCESSFUL_AUTH_RESPONSE)
  233. # Ensure that the error signal was not triggered
  234. assert authorization_service.onAuthenticationError.emit.call_count == 0
  235. # Since we said that it went right this time, validate that we got a signal.
  236. assert authorization_service.onAuthStateChanged.emit.call_count == 1
  237. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
  238. def callback(profile):
  239. assert profile is not None
  240. authorization_service.getUserProfile(callback)
  241. assert authorization_service.getAccessToken() == "beep"
  242. # Check that we stored the authentication data, so next time the user won't have to log in again.
  243. assert preferences.getValue("test/auth_data") is not None
  244. # We're logged in now, also check if logging out works
  245. authorization_service.deleteAuthData()
  246. assert authorization_service.onAuthStateChanged.emit.call_count == 2
  247. with patch("UM.TaskManagement.HttpRequestManager.HttpRequestManager.getInstance", MagicMock(return_value = http_mock)):
  248. def callback(profile):
  249. assert profile is None
  250. authorization_service.getUserProfile(callback)
  251. # Ensure the data is gone after we logged out.
  252. assert preferences.getValue("test/auth_data") == "{}"
  253. def test_wrongServerResponses() -> None:
  254. authorization_service = AuthorizationService(OAUTH_SETTINGS, Preferences())
  255. authorization_service.initialize()
  256. authorization_service._onAuthStateChanged(MALFORMED_AUTH_RESPONSE)
  257. def callback(profile):
  258. assert profile is None
  259. authorization_service.getUserProfile(callback)
  260. def test__generate_auth_url() -> None:
  261. preferences = Preferences()
  262. authorization_service = AuthorizationService(OAUTH_SETTINGS, preferences)
  263. query_parameters_dict = {
  264. "client_id": "",
  265. "redirect_uri": OAUTH_SETTINGS.CALLBACK_URL,
  266. "scope": OAUTH_SETTINGS.CLIENT_SCOPES,
  267. "response_type": "code"
  268. }
  269. auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = False)
  270. assert MYCLOUD_LOGOFF_URL + "&next=" not in auth_url
  271. auth_url = authorization_service._generate_auth_url(query_parameters_dict, force_browser_logout = True)
  272. assert MYCLOUD_LOGOFF_URL + "&next=" in auth_url