TestCloudOutputDeviceManager.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. from unittest import TestCase
  4. from unittest.mock import patch, MagicMock
  5. from UM.OutputDevice.OutputDeviceManager import OutputDeviceManager
  6. from cura.UltimakerCloudAuthentication import CuraCloudAPIRoot
  7. from ...src.Cloud import CloudApiClient
  8. from ...src.Cloud import CloudOutputDeviceManager
  9. from ...src.Cloud.Models.CloudClusterResponse import CloudClusterResponse
  10. from .Fixtures import parseFixture, readFixture
  11. from .NetworkManagerMock import NetworkManagerMock, FakeSignal
  12. class TestCloudOutputDeviceManager(TestCase):
  13. maxDiff = None
  14. URL = CuraCloudAPIRoot + "/connect/v1/clusters"
  15. def setUp(self):
  16. super().setUp()
  17. self.app = MagicMock()
  18. self.device_manager = OutputDeviceManager()
  19. self.app.getOutputDeviceManager.return_value = self.device_manager
  20. self.patches = [patch("UM.Qt.QtApplication.QtApplication.getInstance", return_value=self.app),
  21. patch("UM.Application.Application.getInstance", return_value=self.app)]
  22. for patched_method in self.patches:
  23. patched_method.start()
  24. self.network = NetworkManagerMock()
  25. self.timer = MagicMock(timeout = FakeSignal())
  26. with patch.object(CloudApiClient, "QNetworkAccessManager", return_value = self.network), \
  27. patch.object(CloudOutputDeviceManager, "QTimer", return_value = self.timer):
  28. self.manager = CloudOutputDeviceManager.CloudOutputDeviceManager()
  29. self.clusters_response = parseFixture("getClusters")
  30. self.network.prepareReply("GET", self.URL, 200, readFixture("getClusters"))
  31. def tearDown(self):
  32. try:
  33. self._beforeTearDown()
  34. self.network.flushReplies()
  35. self.manager.stop()
  36. for patched_method in self.patches:
  37. patched_method.stop()
  38. finally:
  39. super().tearDown()
  40. ## Before tear down method we check whether the state of the output device manager is what we expect based on the
  41. # mocked API response.
  42. def _beforeTearDown(self):
  43. # let the network send replies
  44. self.network.flushReplies()
  45. # get the created devices
  46. devices = self.device_manager.getOutputDevices()
  47. # TODO: Check active device
  48. response_clusters = []
  49. for cluster in self.clusters_response.get("data", []):
  50. response_clusters.append(CloudClusterResponse(**cluster).toDict())
  51. manager_clusters = sorted([device.clusterData.toDict() for device in self.manager._remote_clusters.values()],
  52. key=lambda cluster: cluster['cluster_id'], reverse=True)
  53. self.assertEqual(response_clusters, manager_clusters)
  54. ## Runs the initial request to retrieve the clusters.
  55. def _loadData(self):
  56. self.manager.start()
  57. self.network.flushReplies()
  58. def test_device_is_created(self):
  59. # just create the cluster, it is checked at tearDown
  60. self._loadData()
  61. def test_device_is_updated(self):
  62. self._loadData()
  63. # update the cluster from member variable, which is checked at tearDown
  64. self.clusters_response["data"][0]["host_name"] = "New host name"
  65. self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
  66. self.manager._update_timer.timeout.emit()
  67. def test_device_is_removed(self):
  68. self._loadData()
  69. # delete the cluster from member variable, which is checked at tearDown
  70. del self.clusters_response["data"][1]
  71. self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
  72. self.manager._update_timer.timeout.emit()
  73. def test_device_connects_by_cluster_id(self):
  74. active_machine_mock = self.app.getGlobalContainerStack.return_value
  75. cluster1, cluster2 = self.clusters_response["data"]
  76. cluster_id = cluster1["cluster_id"]
  77. active_machine_mock.getMetaDataEntry.side_effect = {"um_cloud_cluster_id": cluster_id}.get
  78. self._loadData()
  79. self.assertTrue(self.device_manager.getOutputDevice(cluster1["cluster_id"]).isConnected())
  80. self.assertIsNone(self.device_manager.getOutputDevice(cluster2["cluster_id"]))
  81. self.assertEqual([], active_machine_mock.setMetaDataEntry.mock_calls)
  82. def test_device_connects_by_network_key(self):
  83. active_machine_mock = self.app.getGlobalContainerStack.return_value
  84. cluster1, cluster2 = self.clusters_response["data"]
  85. network_key = cluster2["host_name"] + ".ultimaker.local"
  86. active_machine_mock.getMetaDataEntry.side_effect = {"um_network_key": network_key}.get
  87. self._loadData()
  88. self.assertIsNone(self.device_manager.getOutputDevice(cluster1["cluster_id"]))
  89. self.assertTrue(self.device_manager.getOutputDevice(cluster2["cluster_id"]).isConnected())
  90. active_machine_mock.setMetaDataEntry.assert_called_with("um_cloud_cluster_id", cluster2["cluster_id"])
  91. @patch.object(CloudOutputDeviceManager, "Message")
  92. def test_api_error(self, message_mock):
  93. self.clusters_response = {
  94. "errors": [{"id": "notFound", "title": "Not found!", "http_status": "404", "code": "notFound"}]
  95. }
  96. self.network.prepareReply("GET", self.URL, 200, self.clusters_response)
  97. self._loadData()
  98. message_mock.return_value.show.assert_called_once_with()