TestCloudApiClient.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Copyright (c) 2018 Ultimaker B.V.
  3. # Cura is released under the terms of the LGPLv3 or higher.
  4. from typing import List
  5. from unittest import TestCase
  6. from unittest.mock import patch, MagicMock
  7. from cura.UltimakerCloudAuthentication import CuraCloudAPIRoot
  8. from ...src.Cloud import CloudApiClient
  9. from ...src.Cloud.Models.CloudClusterResponse import CloudClusterResponse
  10. from ...src.Cloud.Models.CloudClusterStatus import CloudClusterStatus
  11. from ...src.Cloud.Models.CloudPrintJobResponse import CloudPrintJobResponse
  12. from ...src.Cloud.Models.CloudPrintJobUploadRequest import CloudPrintJobUploadRequest
  13. from ...src.Cloud.Models.CloudError import CloudError
  14. from .Fixtures import readFixture, parseFixture
  15. from .NetworkManagerMock import NetworkManagerMock
  16. class TestCloudApiClient(TestCase):
  17. maxDiff = None
  18. def _errorHandler(self, errors: List[CloudError]):
  19. raise Exception("Received unexpected error: {}".format(errors))
  20. def setUp(self):
  21. super().setUp()
  22. self.account = MagicMock()
  23. self.account.isLoggedIn.return_value = True
  24. self.network = NetworkManagerMock()
  25. with patch.object(CloudApiClient, 'QNetworkAccessManager', return_value = self.network):
  26. self.api = CloudApiClient.CloudApiClient(self.account, self._errorHandler)
  27. def test_getClusters(self):
  28. result = []
  29. response = readFixture("getClusters")
  30. data = parseFixture("getClusters")["data"]
  31. self.network.prepareReply("GET", CuraCloudAPIRoot + "/connect/v1/clusters", 200, response)
  32. # The callback is a function that adds the result of the call to getClusters to the result list
  33. self.api.getClusters(lambda clusters: result.extend(clusters))
  34. self.network.flushReplies()
  35. self.assertEqual([CloudClusterResponse(**data[0]), CloudClusterResponse(**data[1])], result)
  36. def test_getClusterStatus(self):
  37. result = []
  38. response = readFixture("getClusterStatusResponse")
  39. data = parseFixture("getClusterStatusResponse")["data"]
  40. url = CuraCloudAPIRoot + "/connect/v1/clusters/R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC/status"
  41. self.network.prepareReply("GET", url, 200, response)
  42. self.api.getClusterStatus("R0YcLJwar1ugh0ikEZsZs8NWKV6vJP_LdYsXgXqAcaNC", lambda s: result.append(s))
  43. self.network.flushReplies()
  44. self.assertEqual([CloudClusterStatus(**data)], result)
  45. def test_requestUpload(self):
  46. results = []
  47. response = readFixture("putJobUploadResponse")
  48. self.network.prepareReply("PUT", CuraCloudAPIRoot + "/cura/v1/jobs/upload", 200, response)
  49. request = CloudPrintJobUploadRequest(job_name = "job name", file_size = 143234, content_type = "text/plain")
  50. self.api.requestUpload(request, lambda r: results.append(r))
  51. self.network.flushReplies()
  52. self.assertEqual(["text/plain"], [r.content_type for r in results])
  53. self.assertEqual(["uploading"], [r.status for r in results])
  54. def test_uploadToolPath(self):
  55. results = []
  56. progress = MagicMock()
  57. data = parseFixture("putJobUploadResponse")["data"]
  58. upload_response = CloudPrintJobResponse(**data)
  59. # Network client doesn't look into the reply
  60. self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
  61. mesh = ("1234" * 100000).encode()
  62. self.api.uploadToolPath(upload_response, mesh, lambda: results.append("sent"), progress.advance, progress.error)
  63. for _ in range(10):
  64. self.network.flushReplies()
  65. self.network.prepareReply("PUT", upload_response.upload_url, 200, b'{}')
  66. self.assertEqual(["sent"], results)
  67. def test_requestPrint(self):
  68. results = []
  69. response = readFixture("postJobPrintResponse")
  70. cluster_id = "NWKV6vJP_LdYsXgXqAcaNCR0YcLJwar1ugh0ikEZsZs8"
  71. cluster_job_id = "9a59d8e9-91d3-4ff6-b4cb-9db91c4094dd"
  72. job_id = "ABCDefGHIjKlMNOpQrSTUvYxWZ0-1234567890abcDE="
  73. self.network.prepareReply("POST",
  74. CuraCloudAPIRoot + "/connect/v1/clusters/{}/print/{}"
  75. .format(cluster_id, job_id),
  76. 200, response)
  77. self.api.requestPrint(cluster_id, job_id, lambda r: results.append(r))
  78. self.network.flushReplies()
  79. self.assertEqual([job_id], [r.job_id for r in results])
  80. self.assertEqual([cluster_job_id], [r.cluster_job_id for r in results])
  81. self.assertEqual(["queued"], [r.status for r in results])