UploadMaterialsJob.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # Copyright (c) 2021 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import enum
  4. import functools # For partial methods to use as callbacks with information pre-filled.
  5. import json # To serialise metadata for API calls.
  6. import os # To delete the archive when we're done.
  7. from PyQt5.QtCore import QUrl
  8. import tempfile # To create an archive before we upload it.
  9. import cura.CuraApplication # Imported like this to prevent circular imports.
  10. from cura.Settings.CuraContainerRegistry import CuraContainerRegistry # To find all printers to upload to.
  11. from cura.UltimakerCloud import UltimakerCloudConstants # To know where the API is.
  12. from cura.UltimakerCloud.UltimakerCloudScope import UltimakerCloudScope # To know how to communicate with this server.
  13. from UM.i18n import i18nCatalog
  14. from UM.Job import Job
  15. from UM.Logger import Logger
  16. from UM.Signal import Signal
  17. from UM.TaskManagement.HttpRequestManager import HttpRequestManager # To call the API.
  18. from UM.TaskManagement.HttpRequestScope import JsonDecoratorScope
  19. from typing import Any, cast, Dict, List, Optional, TYPE_CHECKING
  20. if TYPE_CHECKING:
  21. from PyQt5.QtNetwork import QNetworkReply
  22. from cura.UltimakerCloud.CloudMaterialSync import CloudMaterialSync
  23. catalog = i18nCatalog("cura")
  24. class UploadMaterialsError(Exception):
  25. """
  26. Class to indicate something went wrong while uploading.
  27. """
  28. pass
  29. class UploadMaterialsJob(Job):
  30. """
  31. Job that uploads a set of materials to the Digital Factory.
  32. The job has a number of stages:
  33. - First, it generates an archive of all materials. This typically takes a lot of processing power during which the
  34. GIL remains locked.
  35. - Then it requests the API to upload an archive.
  36. - Then it uploads the archive to the URL given by the first request.
  37. - Then it tells the API that the archive can be distributed to the printers.
  38. """
  39. UPLOAD_REQUEST_URL = f"{UltimakerCloudConstants.CuraCloudAPIRoot}/connect/v1/materials/upload"
  40. UPLOAD_CONFIRM_URL = UltimakerCloudConstants.CuraCloudAPIRoot + "/connect/v1/clusters/{cluster_id}/printers/{cluster_printer_id}/action/import_material"
  41. class Result(enum.IntEnum):
  42. SUCCESS = 0
  43. FAILED = 1
  44. class PrinterStatus(enum.Enum):
  45. UPLOADING = "uploading"
  46. SUCCESS = "success"
  47. FAILED = "failed"
  48. def __init__(self, material_sync: "CloudMaterialSync"):
  49. super().__init__()
  50. self._material_sync = material_sync
  51. self._scope = JsonDecoratorScope(UltimakerCloudScope(cura.CuraApplication.CuraApplication.getInstance())) # type: JsonDecoratorScope
  52. self._archive_filename = None # type: Optional[str]
  53. self._archive_remote_id = None # type: Optional[str] # ID that the server gives to this archive. Used to communicate about the archive to the server.
  54. self._printer_sync_status = {} # type: Dict[str, str]
  55. self._printer_metadata = [] # type: List[Dict[str, Any]]
  56. self.processProgressChanged.connect(self._onProcessProgressChanged)
  57. uploadCompleted = Signal() # Triggered when the job is really complete, including uploading to the cloud.
  58. processProgressChanged = Signal() # Triggered when we've made progress creating the archive.
  59. uploadProgressChanged = Signal() # Triggered when we've made progress with the complete job. This signal emits a progress fraction (0-1) as well as the status of every printer.
  60. def run(self) -> None:
  61. """
  62. Generates an archive of materials and starts uploading that archive to the cloud.
  63. """
  64. self._printer_metadata = CuraContainerRegistry.getInstance().findContainerStacksMetadata(
  65. type = "machine",
  66. connection_type = "3", # Only cloud printers.
  67. is_online = "True", # Only online printers. Otherwise the server gives an error.
  68. host_guid = "*", # Required metadata field. Otherwise we get a KeyError.
  69. um_cloud_cluster_id = "*" # Required metadata field. Otherwise we get a KeyError.
  70. )
  71. # Filter out any printer not capable of the 'import_material' capability. Needs FW 7.0.1-RC at the least!
  72. self._printer_metadata = [ printer_data for printer_data in self._printer_metadata if (
  73. UltimakerCloudConstants.META_CAPABILITIES in printer_data and
  74. "import_material" in printer_data[UltimakerCloudConstants.META_CAPABILITIES]
  75. )
  76. ]
  77. for printer in self._printer_metadata:
  78. self._printer_sync_status[printer["host_guid"]] = self.PrinterStatus.UPLOADING.value
  79. try:
  80. archive_file = tempfile.NamedTemporaryFile("wb", delete = False)
  81. archive_file.close()
  82. self._archive_filename = archive_file.name
  83. self._material_sync.exportAll(QUrl.fromLocalFile(self._archive_filename), notify_progress = self.processProgressChanged)
  84. except OSError as e:
  85. Logger.error(f"Failed to create archive of materials to sync with printers: {type(e)} - {e}")
  86. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to create archive of materials to sync with printers.")))
  87. return
  88. try:
  89. file_size = os.path.getsize(self._archive_filename)
  90. except OSError as e:
  91. Logger.error(f"Failed to load the archive of materials to sync it with printers: {type(e)} - {e}")
  92. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to load the archive of materials to sync it with printers.")))
  93. return
  94. request_metadata = {
  95. "data": {
  96. "file_size": file_size,
  97. "material_profile_name": "cura.umm", # File name can be anything as long as it's .umm. It's not used by anyone.
  98. "content_type": "application/zip", # This endpoint won't receive files of different MIME types.
  99. "origin": "cura" # Some identifier against hackers intercepting this upload request, apparently.
  100. }
  101. }
  102. request_payload = json.dumps(request_metadata).encode("UTF-8")
  103. http = HttpRequestManager.getInstance()
  104. http.put(
  105. url = self.UPLOAD_REQUEST_URL,
  106. data = request_payload,
  107. callback = self.onUploadRequestCompleted,
  108. error_callback = self.onError,
  109. scope = self._scope
  110. )
  111. def onUploadRequestCompleted(self, reply: "QNetworkReply") -> None:
  112. """
  113. Triggered when we successfully requested to upload a material archive.
  114. We then need to start uploading the material archive to the URL that the request answered with.
  115. :param reply: The reply from the server to our request to upload an archive.
  116. """
  117. response_data = HttpRequestManager.readJSON(reply)
  118. if response_data is None:
  119. Logger.error(f"Invalid response to material upload request. Could not parse JSON data.")
  120. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory appears to be corrupted.")))
  121. return
  122. if "data" not in response_data:
  123. Logger.error(f"Invalid response to material upload request: Missing 'data' field that contains the entire response.")
  124. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory is missing important information.")))
  125. return
  126. if "upload_url" not in response_data["data"]:
  127. Logger.error(f"Invalid response to material upload request: Missing 'upload_url' field to upload archive to.")
  128. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory is missing important information.")))
  129. return
  130. if "material_profile_id" not in response_data["data"]:
  131. Logger.error(f"Invalid response to material upload request: Missing 'material_profile_id' to communicate about the materials with the server.")
  132. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "The response from Digital Factory is missing important information.")))
  133. return
  134. upload_url = response_data["data"]["upload_url"]
  135. self._archive_remote_id = response_data["data"]["material_profile_id"]
  136. try:
  137. with open(cast(str, self._archive_filename), "rb") as f:
  138. file_data = f.read()
  139. except OSError as e:
  140. Logger.error(f"Failed to load archive back in for sending to cloud: {type(e)} - {e}")
  141. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to load the archive of materials to sync it with printers.")))
  142. return
  143. http = HttpRequestManager.getInstance()
  144. http.put(
  145. url = upload_url,
  146. data = file_data,
  147. callback = self.onUploadCompleted,
  148. error_callback = self.onError,
  149. scope = self._scope
  150. )
  151. def onUploadCompleted(self, reply: "QNetworkReply") -> None:
  152. """
  153. When we've successfully uploaded the archive to the cloud, we need to notify the API to start syncing that
  154. archive to every printer.
  155. :param reply: The reply from the cloud storage when the upload succeeded.
  156. """
  157. for container_stack in self._printer_metadata:
  158. cluster_id = container_stack["um_cloud_cluster_id"]
  159. printer_id = container_stack["host_guid"]
  160. http = HttpRequestManager.getInstance()
  161. http.post(
  162. url = self.UPLOAD_CONFIRM_URL.format(cluster_id = cluster_id, cluster_printer_id = printer_id),
  163. callback = functools.partial(self.onUploadConfirmed, printer_id),
  164. error_callback = functools.partial(self.onUploadConfirmed, printer_id), # Let this same function handle the error too.
  165. scope = self._scope,
  166. data = json.dumps({"data": {"material_profile_id": self._archive_remote_id}}).encode("UTF-8")
  167. )
  168. def onUploadConfirmed(self, printer_id: str, reply: "QNetworkReply", error: Optional["QNetworkReply.NetworkError"] = None) -> None:
  169. """
  170. Triggered when we've got a confirmation that the material is synced with the printer, or that syncing failed.
  171. If syncing succeeded we mark this printer as having the status "success". If it failed we mark the printer as
  172. "failed". If this is the last upload that needed to be completed, we complete the job with either a success
  173. state (every printer successfully synced) or a failed state (any printer failed).
  174. :param printer_id: The printer host_guid that we completed syncing with.
  175. :param reply: The reply that the server gave to confirm.
  176. :param error: If the request failed, this error gives an indication what happened.
  177. """
  178. if error is not None:
  179. Logger.error(f"Failed to confirm uploading material archive to printer {printer_id}: {error}")
  180. self._printer_sync_status[printer_id] = self.PrinterStatus.FAILED.value
  181. else:
  182. self._printer_sync_status[printer_id] = self.PrinterStatus.SUCCESS.value
  183. still_uploading = len([val for val in self._printer_sync_status.values() if val == self.PrinterStatus.UPLOADING.value])
  184. self.uploadProgressChanged.emit(0.8 + (len(self._printer_sync_status) - still_uploading) / len(self._printer_sync_status), self.getPrinterSyncStatus())
  185. if still_uploading == 0: # This is the last response to be processed.
  186. if self.PrinterStatus.FAILED.value in self._printer_sync_status.values():
  187. self.setResult(self.Result.FAILED)
  188. self.setError(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to connect to Digital Factory to sync materials with some of the printers.")))
  189. else:
  190. self.setResult(self.Result.SUCCESS)
  191. self.uploadCompleted.emit(self.getResult(), self.getError())
  192. def onError(self, reply: "QNetworkReply", error: Optional["QNetworkReply.NetworkError"]) -> None:
  193. """
  194. Used as callback from HTTP requests when the request failed.
  195. The given network error from the `HttpRequestManager` is logged, and the job is marked as failed.
  196. :param reply: The main reply of the server. This reply will most likely not be valid.
  197. :param error: The network error (Qt's enum) that occurred.
  198. """
  199. Logger.error(f"Failed to upload material archive: {error}")
  200. self.failed(UploadMaterialsError(catalog.i18nc("@text:error", "Failed to connect to Digital Factory.")))
  201. def getPrinterSyncStatus(self) -> Dict[str, str]:
  202. """
  203. For each printer, identified by host_guid, this gives the current status of uploading the material archive.
  204. The possible states are given in the PrinterStatus enum.
  205. :return: A dictionary with printer host_guids as keys, and their status as values.
  206. """
  207. return self._printer_sync_status
  208. def failed(self, error: UploadMaterialsError) -> None:
  209. """
  210. Helper function for when we have a general failure.
  211. This sets the sync status for all printers to failed, sets the error on
  212. the job and the result of the job to FAILED.
  213. :param error: An error to show to the user.
  214. """
  215. self.setResult(self.Result.FAILED)
  216. self.setError(error)
  217. for printer_id in self._printer_sync_status:
  218. self._printer_sync_status[printer_id] = self.PrinterStatus.FAILED.value
  219. self.uploadProgressChanged.emit(1.0, self.getPrinterSyncStatus())
  220. self.uploadCompleted.emit(self.getResult(), self.getError())
  221. def _onProcessProgressChanged(self, progress: float) -> None:
  222. """
  223. When we progress in the process of uploading materials, we not only signal the new progress (float from 0 to 1)
  224. but we also signal the current status of every printer. These are emitted as the two parameters of the signal.
  225. :param progress: The progress of this job, between 0 and 1.
  226. """
  227. self.uploadProgressChanged.emit(progress * 0.8, self.getPrinterSyncStatus()) # The processing is 80% of the progress bar.