123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import os
- import gzip
- from time import time
- from typing import Dict, List, Optional
- from enum import IntEnum
- from PyQt5.QtNetwork import QNetworkRequest, QNetworkReply
- from PyQt5.QtCore import pyqtProperty, pyqtSignal, pyqtSlot, QObject, QCoreApplication
- from UM.FileHandler.FileHandler import FileHandler
- from UM.Scene.SceneNode import SceneNode
- from cura.NetworkClient import NetworkClient
- from cura.PrinterOutputDevice import PrinterOutputDevice, ConnectionState
- class AuthState(IntEnum):
- NotAuthenticated = 1
- AuthenticationRequested = 2
- Authenticated = 3
- AuthenticationDenied = 4
- AuthenticationReceived = 5
- class NetworkedPrinterOutputDevice(PrinterOutputDevice, NetworkClient):
- authenticationStateChanged = pyqtSignal()
- def __init__(self, device_id, address: str, properties: Dict[bytes, bytes], parent: QObject = None) -> None:
- PrinterOutputDevice.__init__(self, device_id = device_id, parent = parent)
- NetworkClient.__init__(self)
-
- self._api_prefix = ""
- self._address = address
- self._properties = properties
- self._authentication_state = AuthState.NotAuthenticated
- self._sending_gcode = False
- self._compressing_gcode = False
- self._gcode = []
-
- self._connection_state_before_timeout = None
- self._timeout_time = 10
- self._recreate_network_manager_time = 30
-
-
-
- def _createEmptyRequest(self, target: str, content_type: Optional[str] = "application/json") -> QNetworkRequest:
- return super()._createEmptyRequest("http://" + self._address + self._api_prefix + target, content_type)
- def requestWrite(self, nodes: List[SceneNode], file_name: Optional[str] = None, limit_mimetypes: bool = False, file_handler: Optional[FileHandler] = None, **kwargs: str) -> None:
- raise NotImplementedError("requestWrite needs to be implemented")
- def setAuthenticationState(self, authentication_state: AuthState) -> None:
- if self._authentication_state != authentication_state:
- self._authentication_state = authentication_state
- self.authenticationStateChanged.emit()
- @pyqtProperty(int, notify = authenticationStateChanged)
- def authenticationState(self) -> AuthState:
- return self._authentication_state
- def _compressDataAndNotifyQt(self, data_to_append: str) -> bytes:
- compressed_data = gzip.compress(data_to_append.encode("utf-8"))
- self._progress_message.setProgress(-1)
- QCoreApplication.processEvents()
-
-
- self._last_response_time = time()
- return compressed_data
- def _compressGCode(self) -> Optional[bytes]:
- self._compressing_gcode = True
-
- max_chars_per_line = int(1024 * 1024 / 4)
- file_data_bytes_list = []
- batched_lines = []
- batched_lines_count = 0
- for line in self._gcode:
- if not self._compressing_gcode:
- self._progress_message.hide()
-
- return None
-
-
- batched_lines.append(line)
- batched_lines_count += len(line)
- if batched_lines_count >= max_chars_per_line:
- file_data_bytes_list.append(self._compressDataAndNotifyQt("".join(batched_lines)))
- batched_lines = []
- batched_lines_count = 0
-
- if len(batched_lines) != 0:
- file_data_bytes_list.append(self._compressDataAndNotifyQt("".join(batched_lines)))
- self._compressing_gcode = False
- return b"".join(file_data_bytes_list)
- def _update(self) -> None:
- if self._last_response_time:
- time_since_last_response = time() - self._last_response_time
- else:
- time_since_last_response = 0
- if self._last_request_time:
- time_since_last_request = time() - self._last_request_time
- else:
- time_since_last_request = float("inf")
- if time_since_last_response > self._timeout_time >= time_since_last_request:
-
- if self._connection_state_before_timeout is None:
- self._connection_state_before_timeout = self._connection_state
- self.setConnectionState(ConnectionState.closed)
-
-
- if time_since_last_response > self._recreate_network_manager_time:
- if self._last_manager_create_time is None or time() - self._last_manager_create_time > self._recreate_network_manager_time:
- self._createNetworkManager()
- assert(self._manager is not None)
- elif self._connection_state == ConnectionState.closed:
-
- if self._connection_state_before_timeout is not None:
- self.setConnectionState(self._connection_state_before_timeout)
- self._connection_state_before_timeout = None
-
-
- def _getUserName(self) -> str:
- for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
- user = os.environ.get(name)
- if user:
- return user
- return "Unknown User"
- @pyqtSlot(str, result = str)
- def getProperty(self, key: str) -> str:
- bytes_key = key.encode("utf-8")
- if bytes_key in self._properties:
- return self._properties.get(bytes_key, b"").decode("utf-8")
- else:
- return ""
- def getProperties(self):
- return self._properties
-
-
- @pyqtProperty(str, constant = True)
- def key(self) -> str:
- return self._id
-
- @pyqtProperty(str, constant = True)
- def address(self) -> str:
- return self._properties.get(b"address", b"").decode("utf-8")
-
- @pyqtProperty(str, constant = True)
- def name(self) -> str:
- return self._properties.get(b"name", b"").decode("utf-8")
-
- @pyqtProperty(str, constant = True)
- def firmwareVersion(self) -> str:
- return self._properties.get(b"firmware_version", b"").decode("utf-8")
- @pyqtProperty(str, constant = True)
- def printerType(self) -> str:
- return self._properties.get(b"printer_type", b"Unknown").decode("utf-8")
-
- @pyqtProperty(str, constant = True)
- def ipAddress(self) -> str:
- return self._address
- def __handleOnFinished(self, reply: QNetworkReply) -> None:
- super().__handleOnFinished(reply)
-
-
- if self._connection_state == ConnectionState.connecting:
- self.setConnectionState(ConnectionState.connected)
|