UploadMaterialsJob.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. # Copyright (c) 2021 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import enum
  4. import functools # For partial methods to use as callbacks with information pre-filled.
  5. import json # To serialise metadata for API calls.
  6. import os # To delete the archive when we're done.
  7. from PyQt5.QtCore import QUrl
  8. import tempfile # To create an archive before we upload it.
  9. import cura.CuraApplication # Imported like this to prevent circular imports.
  10. from cura.Settings.CuraContainerRegistry import CuraContainerRegistry # To find all printers to upload to.
  11. from cura.UltimakerCloud import UltimakerCloudConstants # To know where the API is.
  12. from cura.UltimakerCloud.UltimakerCloudScope import UltimakerCloudScope # To know how to communicate with this server.
  13. from UM.i18n import i18nCatalog
  14. from UM.Job import Job
  15. from UM.Logger import Logger
  16. from UM.Signal import Signal
  17. from UM.TaskManagement.HttpRequestManager import HttpRequestManager # To call the API.
  18. from UM.TaskManagement.HttpRequestScope import JsonDecoratorScope
  19. from typing import Any, cast, Dict, List, Optional, TYPE_CHECKING
  20. if TYPE_CHECKING:
  21. from PyQt5.QtNetwork import QNetworkReply
  22. from cura.UltimakerCloud.CloudMaterialSync import CloudMaterialSync
  23. catalog = i18nCatalog("cura")
  24. class UploadMaterialsError(Exception):
  25. """
  26. Class to indicate something went wrong while uploading.
  27. """
  28. pass
  29. class UploadMaterialsJob(Job):
  30. """
  31. Job that uploads a set of materials to the Digital Factory.
  32. The job has a number of stages:
  33. - First, it generates an archive of all materials. This typically takes a lot of processing power during which the
  34. GIL remains locked.
  35. - Then it requests the API to upload an archive.
  36. - Then it uploads the archive to the URL given by the first request.
  37. - Then it tells the API that the archive can be distributed to the printers.
  38. """
  39. UPLOAD_REQUEST_URL = f"{UltimakerCloudConstants.CuraCloudAPIRoot}/connect/v1/materials/upload"
  40. UPLOAD_CONFIRM_URL = UltimakerCloudConstants.CuraCloudAPIRoot + "/connect/v1/clusters/{cluster_id}/printers/{cluster_printer_id}/action/import_material"
  41. class Result(enum.IntEnum):
  42. SUCCESS = 0
  43. FAILED = 1
  44. class PrinterStatus(enum.Enum):
  45. UPLOADING = "uploading"
  46. SUCCESS = "success"
  47. FAILED = "failed"
  48. def __init__(self, material_sync: "CloudMaterialSync"):
  49. super().__init__()
  50. self._material_sync = material_sync
  51. self._scope = JsonDecoratorScope(UltimakerCloudScope(cura.CuraApplication.CuraApplication.getInstance())) # type: JsonDecoratorScope
  52. self._archive_filename = None # type: Optional[str]
  53. self._archive_remote_id = None # type: Optional[str] # ID that the server gives to this archive. Used to communicate about the archive to the server.
  54. self._printer_sync_status = {} # type: Dict[str, str]
  55. self._printer_metadata = [] # type: List[Dict[str, Any]]
  56. self.processProgressChanged.connect(self._onProcessProgressChanged)
  57. uploadCompleted = Signal() # Triggered when the job is really complete, including uploading to the cloud.
  58. processProgressChanged = Signal() # Triggered when we've made progress creating the archive.
  59. uploadProgressChanged = Signal() # Triggered when we've made progress with the complete job. This signal emits a progress fraction (0-1) as well as the status of every printer.
  60. def run(self) -> None:
  61. """
  62. Generates an archive of materials and starts uploading that archive to the cloud.
  63. """
  64. self._printer_metadata = CuraContainerRegistry.getInstance().findContainerStacksMetadata(
  65. type = "machine",
  66. connection_type = "3", # Only cloud printers.
  67. is_online = "True", # Only online printers. Otherwise the server gives an error.
  68. host_guid = "*", # Required metadata field. Otherwise we get a KeyError.
  69. um_cloud_cluster_id = "*" # Required metadata field. Otherwise we get a KeyError.
  70. )
  71. for printer in self._printer_metadata:
  72. self._printer_sync_status[printer["host_guid"]] = self.PrinterStatus.UPLOADING.value
  73. try:
  74. archive_file = tempfile.NamedTemporaryFile("wb", delete = False)
  75. archive_file.close()
  76. self._archive_filename = archive_file.name
  77. self._material_sync.exportAll(QUrl.fromLocalFile(self._archive_filename), notify_progress = self.processProgressChanged)
  78. except OSError as e:
  79. Logger.error(f"Failed to create archive of materials to sync with printers: {type(e)} - {e}")
  80. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to create archive of materials to sync with printers.")))
  81. return
  82. try:
  83. file_size = os.path.getsize(self._archive_filename)
  84. except OSError as e:
  85. Logger.error(f"Failed to load the archive of materials to sync it with printers: {type(e)} - {e}")
  86. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to load the archive of materials to sync it with printers.")))
  87. return
  88. request_metadata = {
  89. "data": {
  90. "file_size": file_size,
  91. "material_profile_name": "cura.umm", # File name can be anything as long as it's .umm. It's not used by anyone.
  92. "content_type": "application/zip", # This endpoint won't receive files of different MIME types.
  93. "origin": "cura" # Some identifier against hackers intercepting this upload request, apparently.
  94. }
  95. }
  96. request_payload = json.dumps(request_metadata).encode("UTF-8")
  97. http = HttpRequestManager.getInstance()
  98. http.put(
  99. url = self.UPLOAD_REQUEST_URL,
  100. data = request_payload,
  101. callback = self.onUploadRequestCompleted,
  102. error_callback = self.onError,
  103. scope = self._scope
  104. )
  105. def onUploadRequestCompleted(self, reply: "QNetworkReply") -> None:
  106. """
  107. Triggered when we successfully requested to upload a material archive.
  108. We then need to start uploading the material archive to the URL that the request answered with.
  109. :param reply: The reply from the server to our request to upload an archive.
  110. """
  111. response_data = HttpRequestManager.readJSON(reply)
  112. if response_data is None:
  113. Logger.error(f"Invalid response to material upload request. Could not parse JSON data.")
  114. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory appears to be corrupted.")))
  115. return
  116. if "data" not in response_data:
  117. Logger.error(f"Invalid response to material upload request: Missing 'data' field that contains the entire response.")
  118. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory is missing important information.")))
  119. return
  120. if "upload_url" not in response_data["data"]:
  121. Logger.error(f"Invalid response to material upload request: Missing 'upload_url' field to upload archive to.")
  122. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory is missing important information.")))
  123. return
  124. if "material_profile_id" not in response_data["data"]:
  125. Logger.error(f"Invalid response to material upload request: Missing 'material_profile_id' to communicate about the materials with the server.")
  126. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory is missing important information.")))
  127. return
  128. upload_url = response_data["data"]["upload_url"]
  129. self._archive_remote_id = response_data["data"]["material_profile_id"]
  130. try:
  131. with open(cast(str, self._archive_filename), "rb") as f:
  132. file_data = f.read()
  133. except OSError as e:
  134. Logger.error(f"Failed to load archive back in for sending to cloud: {type(e)} - {e}")
  135. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to load the archive of materials to sync it with printers.")))
  136. return
  137. http = HttpRequestManager.getInstance()
  138. http.put(
  139. url = upload_url,
  140. data = file_data,
  141. callback = self.onUploadCompleted,
  142. error_callback = self.onError,
  143. scope = self._scope
  144. )
  145. def onUploadCompleted(self, reply: "QNetworkReply") -> None:
  146. """
  147. When we've successfully uploaded the archive to the cloud, we need to notify the API to start syncing that
  148. archive to every printer.
  149. :param reply: The reply from the cloud storage when the upload succeeded.
  150. """
  151. for container_stack in self._printer_metadata:
  152. cluster_id = container_stack["um_cloud_cluster_id"]
  153. printer_id = container_stack["host_guid"]
  154. http = HttpRequestManager.getInstance()
  155. http.post(
  156. url = self.UPLOAD_CONFIRM_URL.format(cluster_id = cluster_id, cluster_printer_id = printer_id),
  157. callback = functools.partial(self.onUploadConfirmed, printer_id),
  158. error_callback = functools.partial(self.onUploadConfirmed, printer_id), # Let this same function handle the error too.
  159. scope = self._scope,
  160. data = json.dumps({"data": {"material_profile_id": self._archive_remote_id}}).encode("UTF-8")
  161. )
  162. def onUploadConfirmed(self, printer_id: str, reply: "QNetworkReply", error: Optional["QNetworkReply.NetworkError"] = None) -> None:
  163. """
  164. Triggered when we've got a confirmation that the material is synced with the printer, or that syncing failed.
  165. If syncing succeeded we mark this printer as having the status "success". If it failed we mark the printer as
  166. "failed". If this is the last upload that needed to be completed, we complete the job with either a success
  167. state (every printer successfully synced) or a failed state (any printer failed).
  168. :param printer_id: The printer host_guid that we completed syncing with.
  169. :param reply: The reply that the server gave to confirm.
  170. :param error: If the request failed, this error gives an indication what happened.
  171. """
  172. if error is not None:
  173. Logger.error(f"Failed to confirm uploading material archive to printer {printer_id}: {error}")
  174. self._printer_sync_status[printer_id] = self.PrinterStatus.FAILED.value
  175. else:
  176. self._printer_sync_status[printer_id] = self.PrinterStatus.SUCCESS.value
  177. still_uploading = len([val for val in self._printer_sync_status.values() if val == self.PrinterStatus.UPLOADING.value])
  178. self.uploadProgressChanged.emit(0.8 + (len(self._printer_sync_status) - still_uploading) / len(self._printer_sync_status), self.getPrinterSyncStatus())
  179. if still_uploading == 0: # This is the last response to be processed.
  180. if self.PrinterStatus.FAILED.value in self._printer_sync_status.values():
  181. self.setResult(self.Result.FAILED)
  182. self.setError(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to connect to Digital Factory to sync materials with some of the printers.")))
  183. else:
  184. self.setResult(self.Result.SUCCESS)
  185. self.uploadCompleted.emit(self.getResult(), self.getError())
  186. def onError(self, reply: "QNetworkReply", error: Optional["QNetworkReply.NetworkError"]) -> None:
  187. """
  188. Used as callback from HTTP requests when the request failed.
  189. The given network error from the `HttpRequestManager` is logged, and the job is marked as failed.
  190. :param reply: The main reply of the server. This reply will most likely not be valid.
  191. :param error: The network error (Qt's enum) that occurred.
  192. """
  193. Logger.error(f"Failed to upload material archive: {error}")
  194. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to connect to Digital Factory.")))
  195. def getPrinterSyncStatus(self) -> Dict[str, str]:
  196. """
  197. For each printer, identified by host_guid, this gives the current status of uploading the material archive.
  198. The possible states are given in the PrinterStatus enum.
  199. :return: A dictionary with printer host_guids as keys, and their status as values.
  200. """
  201. return self._printer_sync_status
  202. def failed(self, error: UploadMaterialsError) -> None:
  203. """
  204. Helper function for when we have a general failure.
  205. This sets the sync status for all printers to failed, sets the error on
  206. the job and the result of the job to FAILED.
  207. :param error: An error to show to the user.
  208. """
  209. self.setResult(self.Result.FAILED)
  210. self.setError(error)
  211. for printer_id in self._printer_sync_status:
  212. self._printer_sync_status[printer_id] = self.PrinterStatus.FAILED.value
  213. self.uploadProgressChanged.emit(1.0, self.getPrinterSyncStatus())
  214. self.uploadCompleted.emit(self.getResult(), self.getError())
  215. def _onProcessProgressChanged(self, progress: float) -> None:
  216. """
  217. When we progress in the process of uploading materials, we not only signal the new progress (float from 0 to 1)
  218. but we also signal the current status of every printer. These are emitted as the two parameters of the signal.
  219. :param progress: The progress of this job, between 0 and 1.
  220. """
  221. self.uploadProgressChanged.emit(progress * 0.8, self.getPrinterSyncStatus()) # The processing is 80% of the progress bar.