123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- from UM.FileHandler.FileHandler import FileHandler
- from UM.Logger import Logger
- from UM.Scene.SceneNode import SceneNode
- from cura.CuraApplication import CuraApplication
- from cura.PrinterOutputDevice import PrinterOutputDevice, ConnectionState
- from PyQt5.QtNetwork import QHttpMultiPart, QHttpPart, QNetworkRequest, QNetworkAccessManager, QNetworkReply, QAuthenticator
- from PyQt5.QtCore import pyqtProperty, pyqtSignal, pyqtSlot, QObject, QUrl, QCoreApplication
- from time import time
- from typing import Any, Callable, Dict, List, Optional
- from enum import IntEnum
- import os
- import gzip
- class AuthState(IntEnum):
- NotAuthenticated = 1
- AuthenticationRequested = 2
- Authenticated = 3
- AuthenticationDenied = 4
- AuthenticationReceived = 5
- class NetworkedPrinterOutputDevice(PrinterOutputDevice):
- authenticationStateChanged = pyqtSignal()
- def __init__(self, device_id, address: str, properties: Dict[bytes, bytes], parent: QObject = None) -> None:
- super().__init__(device_id = device_id, parent = parent)
- self._manager = None
- self._last_manager_create_time = None
- self._recreate_network_manager_time = 30
- self._timeout_time = 10
- self._last_response_time = None
- self._last_request_time = None
- self._api_prefix = ""
- self._address = address
- self._properties = properties
- self._user_agent = "%s/%s " % (CuraApplication.getInstance().getApplicationName(), CuraApplication.getInstance().getVersion())
- self._onFinishedCallbacks = {}
- self._authentication_state = AuthState.NotAuthenticated
-
-
- self._kept_alive_multiparts = {}
- self._sending_gcode = False
- self._compressing_gcode = False
- self._gcode = []
- self._connection_state_before_timeout = None
- def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
- raise NotImplementedError("requestWrite needs to be implemented")
- def setAuthenticationState(self, authentication_state: AuthState) -> None:
- if self._authentication_state != authentication_state:
- self._authentication_state = authentication_state
- self.authenticationStateChanged.emit()
- @pyqtProperty(int, notify = authenticationStateChanged)
- def authenticationState(self) -> AuthState:
- return self._authentication_state
- def _compressDataAndNotifyQt(self, data_to_append: str) -> bytes:
- compressed_data = gzip.compress(data_to_append.encode("utf-8"))
- self._progress_message.setProgress(-1)
- QCoreApplication.processEvents()
-
-
- self._last_response_time = time()
- return compressed_data
- def _compressGCode(self) -> Optional[bytes]:
- self._compressing_gcode = True
-
- max_chars_per_line = int(1024 * 1024 / 4)
- file_data_bytes_list = []
- batched_lines = []
- batched_lines_count = 0
- for line in self._gcode:
- if not self._compressing_gcode:
- self._progress_message.hide()
-
- return None
-
-
- batched_lines.append(line)
- batched_lines_count += len(line)
- if batched_lines_count >= max_chars_per_line:
- file_data_bytes_list.append(self._compressDataAndNotifyQt("".join(batched_lines)))
- batched_lines = []
- batched_lines_count = 0
-
- if len(batched_lines) != 0:
- file_data_bytes_list.append(self._compressDataAndNotifyQt("".join(batched_lines)))
- self._compressing_gcode = False
- return b"".join(file_data_bytes_list)
- def _update(self) -> None:
- if self._last_response_time:
- time_since_last_response = time() - self._last_response_time
- else:
- time_since_last_response = 0
- if self._last_request_time:
- time_since_last_request = time() - self._last_request_time
- else:
- time_since_last_request = float("inf")
- if time_since_last_response > self._timeout_time >= time_since_last_request:
-
- if self._connection_state_before_timeout is None:
- self._connection_state_before_timeout = self._connection_state
- self.setConnectionState(ConnectionState.closed)
-
-
- if time_since_last_response > self._recreate_network_manager_time:
- if self._last_manager_create_time is None or time() - self._last_manager_create_time > self._recreate_network_manager_time:
- self._createNetworkManager()
- assert(self._manager is not None)
- elif self._connection_state == ConnectionState.closed:
-
- if self._connection_state_before_timeout is not None:
- self.setConnectionState(self._connection_state_before_timeout)
- self._connection_state_before_timeout = None
- def _createEmptyRequest(self, target: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
- url = QUrl("http://" + self._address + self._api_prefix + target)
- request = QNetworkRequest(url)
- if content_type is not None:
- request.setHeader(QNetworkRequest.ContentTypeHeader, "application/json")
- request.setHeader(QNetworkRequest.UserAgentHeader, self._user_agent)
- return request
- def _createFormPart(self, content_header: str, data: bytes, content_type: Optional[str] = None) -> QHttpPart:
- part = QHttpPart()
- if not content_header.startswith("form-data;"):
- content_header = "form_data; " + content_header
- part.setHeader(QNetworkRequest.ContentDispositionHeader, content_header)
- if content_type is not None:
- part.setHeader(QNetworkRequest.ContentTypeHeader, content_type)
- part.setBody(data)
- return part
-
-
- def _getUserName(self) -> str:
- for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
- user = os.environ.get(name)
- if user:
- return user
- return "Unknown User"
- def _clearCachedMultiPart(self, reply: QNetworkReply) -> None:
- if reply in self._kept_alive_multiparts:
- del self._kept_alive_multiparts[reply]
- def _validateManager(self) -> None:
- if self._manager is None:
- self._createNetworkManager()
- assert (self._manager is not None)
- def put(self, target: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
- self._validateManager()
- request = self._createEmptyRequest(target)
- self._last_request_time = time()
- if self._manager is not None:
- reply = self._manager.put(request, data.encode())
- self._registerOnFinishedCallback(reply, on_finished)
- else:
- Logger.log("e", "Could not find manager.")
- def delete(self, target: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
- self._validateManager()
- request = self._createEmptyRequest(target)
- self._last_request_time = time()
- if self._manager is not None:
- reply = self._manager.deleteResource(request)
- self._registerOnFinishedCallback(reply, on_finished)
- else:
- Logger.log("e", "Could not find manager.")
- def get(self, target: str, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
- self._validateManager()
- request = self._createEmptyRequest(target)
- self._last_request_time = time()
- if self._manager is not None:
- reply = self._manager.get(request)
- self._registerOnFinishedCallback(reply, on_finished)
- else:
- Logger.log("e", "Could not find manager.")
- def post(self, target: str, data: str, on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None:
- self._validateManager()
- request = self._createEmptyRequest(target)
- self._last_request_time = time()
- if self._manager is not None:
- reply = self._manager.post(request, data.encode())
- if on_progress is not None:
- reply.uploadProgress.connect(on_progress)
- self._registerOnFinishedCallback(reply, on_finished)
- else:
- Logger.log("e", "Could not find manager.")
- def postFormWithParts(self, target: str, parts: List[QHttpPart], on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> QNetworkReply:
- self._validateManager()
- request = self._createEmptyRequest(target, content_type=None)
- multi_post_part = QHttpMultiPart(QHttpMultiPart.FormDataType)
- for part in parts:
- multi_post_part.append(part)
- self._last_request_time = time()
- if self._manager is not None:
- reply = self._manager.post(request, multi_post_part)
- self._kept_alive_multiparts[reply] = multi_post_part
- if on_progress is not None:
- reply.uploadProgress.connect(on_progress)
- self._registerOnFinishedCallback(reply, on_finished)
- return reply
- else:
- Logger.log("e", "Could not find manager.")
- def postForm(self, target: str, header_data: str, body_data: bytes, on_finished: Optional[Callable[[QNetworkReply], None]], on_progress: Callable = None) -> None:
- post_part = QHttpPart()
- post_part.setHeader(QNetworkRequest.ContentDispositionHeader, header_data)
- post_part.setBody(body_data)
- self.postFormWithParts(target, [post_part], on_finished, on_progress)
- def _onAuthenticationRequired(self, reply: QNetworkReply, authenticator: QAuthenticator) -> None:
- Logger.log("w", "Request to {url} required authentication, which was not implemented".format(url = reply.url().toString()))
- def _createNetworkManager(self) -> None:
- Logger.log("d", "Creating network manager")
- if self._manager:
- self._manager.finished.disconnect(self.__handleOnFinished)
- self._manager.authenticationRequired.disconnect(self._onAuthenticationRequired)
- self._manager = QNetworkAccessManager()
- self._manager.finished.connect(self.__handleOnFinished)
- self._last_manager_create_time = time()
- self._manager.authenticationRequired.connect(self._onAuthenticationRequired)
- if self._properties.get(b"temporary", b"false") != b"true":
- CuraApplication.getInstance().getMachineManager().checkCorrectGroupName(self.getId(), self.name)
- def _registerOnFinishedCallback(self, reply: QNetworkReply, on_finished: Optional[Callable[[QNetworkReply], None]]) -> None:
- if on_finished is not None:
- self._onFinishedCallbacks[reply.url().toString() + str(reply.operation())] = on_finished
- def __handleOnFinished(self, reply: QNetworkReply) -> None:
-
-
- if reply.operation() == QNetworkAccessManager.PostOperation:
- self._clearCachedMultiPart(reply)
- if reply.attribute(QNetworkRequest.HttpStatusCodeAttribute) is None:
-
- return
- self._last_response_time = time()
- if self._connection_state == ConnectionState.connecting:
- self.setConnectionState(ConnectionState.connected)
- callback_key = reply.url().toString() + str(reply.operation())
- try:
- if callback_key in self._onFinishedCallbacks:
- self._onFinishedCallbacks[callback_key](reply)
- except Exception:
- Logger.logException("w", "something went wrong with callback")
- @pyqtSlot(str, result=str)
- def getProperty(self, key: str) -> str:
- bytes_key = key.encode("utf-8")
- if bytes_key in self._properties:
- return self._properties.get(bytes_key, b"").decode("utf-8")
- else:
- return ""
- def getProperties(self):
- return self._properties
-
-
- @pyqtProperty(str, constant = True)
- def key(self) -> str:
- return self._id
-
- @pyqtProperty(str, constant = True)
- def address(self) -> str:
- return self._properties.get(b"address", b"").decode("utf-8")
-
- @pyqtProperty(str, constant = True)
- def name(self) -> str:
- return self._properties.get(b"name", b"").decode("utf-8")
-
- @pyqtProperty(str, constant = True)
- def firmwareVersion(self) -> str:
- return self._properties.get(b"firmware_version", b"").decode("utf-8")
- @pyqtProperty(str, constant = True)
- def printerType(self) -> str:
- return self._properties.get(b"printer_type", b"Unknown").decode("utf-8")
-
- @pyqtProperty(str, constant = True)
- def ipAddress(self) -> str:
- return self._address
|