123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- from UM.Logger import Logger
- from UM.i18n import i18nCatalog
- from UM.Qt.Duration import DurationFormat
- from UM.PluginRegistry import PluginRegistry
- from cura.CuraApplication import CuraApplication
- from cura.PrinterOutputDevice import PrinterOutputDevice, ConnectionState
- from cura.PrinterOutput.PrinterOutputModel import PrinterOutputModel
- from cura.PrinterOutput.PrintJobOutputModel import PrintJobOutputModel
- from cura.PrinterOutput.GenericOutputController import GenericOutputController
- from .AutoDetectBaudJob import AutoDetectBaudJob
- from .AvrFirmwareUpdater import AvrFirmwareUpdater
- from PyQt5.QtCore import pyqtSlot, pyqtSignal, pyqtProperty
- from serial import Serial, SerialException, SerialTimeoutException
- from threading import Thread, Event
- from time import time, sleep
- from queue import Queue
- from typing import Union, Optional, List, cast
- import re
- import functools
- import os
- catalog = i18nCatalog("cura")
- class USBPrinterOutputDevice(PrinterOutputDevice):
- def __init__(self, serial_port: str, baud_rate: Optional[int] = None) -> None:
- super().__init__(serial_port)
- self.setName(catalog.i18nc("@item:inmenu", "USB printing"))
- self.setShortDescription(catalog.i18nc("@action:button Preceded by 'Ready to'.", "Print via USB"))
- self.setDescription(catalog.i18nc("@info:tooltip", "Print via USB"))
- self.setIconName("print")
- self._serial = None
- self._serial_port = serial_port
- self._address = serial_port
- self._timeout = 3
-
- self._gcode = []
- self._gcode_position = 0
- self._use_auto_detect = True
- self._baud_rate = baud_rate
- self._all_baud_rates = [115200, 250000, 230400, 57600, 38400, 19200, 9600]
-
- self._update_thread = Thread(target=self._update, daemon=True)
- self._last_temperature_request = None
- self._is_printing = False
-
- self._print_start_time = None
- self._print_estimated_time = None
- self._accepts_commands = True
- self._paused = False
- self.setConnectionText(catalog.i18nc("@info:status", "Connected via USB"))
-
- self._command_queue = Queue()
-
- self._command_received = Event()
- self._command_received.set()
- self._firmware_updater = AvrFirmwareUpdater(self)
- CuraApplication.getInstance().getOnExitCallbackManager().addCallback(self._checkActivePrintingUponAppExit)
-
-
- def _checkActivePrintingUponAppExit(self) -> None:
- application = CuraApplication.getInstance()
- if not self._is_printing:
-
- application.triggerNextExitCheck()
- return
- application.setConfirmExitDialogCallback(self._onConfirmExitDialogResult)
- application.showConfirmExitDialog.emit(catalog.i18nc("@label", "A USB print is in progress, closing Cura will stop this print. Are you sure?"))
- def _onConfirmExitDialogResult(self, result: bool) -> None:
- if result:
- application = CuraApplication.getInstance()
- application.triggerNextExitCheck()
- @pyqtSlot(str)
- def updateFirmware(self, file):
- self._firmware_updater.updateFirmware(file)
-
-
- def resetDeviceSettings(self):
- self._firmware_name = None
-
-
-
-
-
-
-
- def requestWrite(self, nodes, file_name = None, filter_by_machine = False, file_handler = None, **kwargs):
- if self._is_printing:
- return
-
- self._printers[0].getController().stopPreheatTimers()
- CuraApplication.getInstance().getController().setActiveStage("MonitorStage")
-
- active_build_plate_id = CuraApplication.getInstance().getMultiBuildPlateModel().activeBuildPlate
- gcode_dict = getattr(CuraApplication.getInstance().getController().getScene(), "gcode_dict")
- gcode_list = gcode_dict[active_build_plate_id]
- self._printGCode(gcode_list)
-
-
- def _printGCode(self, gcode_list: List[str]):
- self._gcode.clear()
- self._paused = False
- for layer in gcode_list:
- self._gcode.extend(layer.split("\n"))
-
- self._gcode.insert(0, "M110")
- self._gcode_position = 0
- self._print_start_time = time()
- self._print_estimated_time = int(CuraApplication.getInstance().getPrintInformation().currentPrintTime.getDisplayString(DurationFormat.Format.Seconds))
- for i in range(0, 4):
- self._sendNextGcodeLine()
- self._is_printing = True
- self.writeFinished.emit(self)
- def _autoDetectFinished(self, job: AutoDetectBaudJob):
- result = job.getResult()
- if result is not None:
- self.setBaudRate(result)
- self.connect()
- def setBaudRate(self, baud_rate: int):
- if baud_rate not in self._all_baud_rates:
- Logger.log("w", "Not updating baudrate to {baud_rate} as it's an unknown baudrate".format(baud_rate=baud_rate))
- return
- self._baud_rate = baud_rate
- def connect(self):
- self._firmware_name = None
- if self._baud_rate is None:
- if self._use_auto_detect:
- auto_detect_job = AutoDetectBaudJob(self._serial_port)
- auto_detect_job.start()
- auto_detect_job.finished.connect(self._autoDetectFinished)
- return
- if self._serial is None:
- try:
- self._serial = Serial(str(self._serial_port), self._baud_rate, timeout=self._timeout, writeTimeout=self._timeout)
- except SerialException:
- Logger.log("w", "An exception occured while trying to create serial connection")
- return
- container_stack = CuraApplication.getInstance().getGlobalContainerStack()
- num_extruders = container_stack.getProperty("machine_extruder_count", "value")
-
- controller = GenericOutputController(self)
- controller.setCanUpdateFirmware(True)
- self._printers = [PrinterOutputModel(output_controller=controller, number_of_extruders=num_extruders)]
- self._printers[0].updateName(container_stack.getName())
- self.setConnectionState(ConnectionState.connected)
- self._update_thread.start()
- def close(self):
- super().close()
- if self._serial is not None:
- self._serial.close()
-
- self._update_thread = Thread(target=self._update, daemon=True)
- self._serial = None
-
- def sendCommand(self, command: Union[str, bytes]):
- if not self._command_received.is_set():
- self._command_queue.put(command)
- else:
- self._sendCommand(command)
- def _sendCommand(self, command: Union[str, bytes]):
- if self._serial is None or self._connection_state != ConnectionState.connected:
- return
- new_command = cast(bytes, command) if type(command) is bytes else cast(str, command).encode()
- if not new_command.endswith(b"\n"):
- new_command += b"\n"
- try:
- self._command_received.clear()
- self._serial.write(new_command)
- except SerialTimeoutException:
- Logger.log("w", "Timeout when sending command to printer via USB.")
- self._command_received.set()
- def _update(self):
- while self._connection_state == ConnectionState.connected and self._serial is not None:
- try:
- line = self._serial.readline()
- except:
- continue
- if self._last_temperature_request is None or time() > self._last_temperature_request + self._timeout:
-
- self._command_received.set()
- self.sendCommand("M105")
- self._last_temperature_request = time()
- if self._firmware_name is None:
- self.sendCommand("M115")
- if (b"ok " in line and b"T:" in line) or b"ok T:" in line or line.startswith(b"T:") or b"ok B:" in line or line.startswith(b"B:"):
- extruder_temperature_matches = re.findall(b"T(\d*): ?([\d\.]+) ?\/?([\d\.]+)?", line)
-
- matched_extruder_nrs = []
- for match in extruder_temperature_matches:
- extruder_nr = 0
- if match[0] != b"":
- extruder_nr = int(match[0])
- if extruder_nr in matched_extruder_nrs:
- continue
- matched_extruder_nrs.append(extruder_nr)
- if extruder_nr >= len(self._printers[0].extruders):
- Logger.log("w", "Printer reports more temperatures than the number of configured extruders")
- continue
- extruder = self._printers[0].extruders[extruder_nr]
- if match[1]:
- extruder.updateHotendTemperature(float(match[1]))
- if match[2]:
- extruder.updateTargetHotendTemperature(float(match[2]))
- bed_temperature_matches = re.findall(b"B: ?([\d\.]+) ?\/?([\d\.]+)?", line)
- if bed_temperature_matches:
- match = bed_temperature_matches[0]
- if match[0]:
- self._printers[0].updateBedTemperature(float(match[0]))
- if match[1]:
- self._printers[0].updateTargetBedTemperature(float(match[1]))
- if b"FIRMWARE_NAME:" in line:
- self._setFirmwareName(line)
- if b"ok" in line:
- self._command_received.set()
- if not self._command_queue.empty():
- self._sendCommand(self._command_queue.get())
- if self._is_printing:
- if self._paused:
- pass
- else:
- self._sendNextGcodeLine()
- if self._is_printing:
- if line.startswith(b'!!'):
- Logger.log('e', "Printer signals fatal error. Cancelling print. {}".format(line))
- self.cancelPrint()
- elif b"resend" in line.lower() or b"rs" in line:
-
- try:
- self._gcode_position = int(line.replace(b"N:", b" ").replace(b"N", b" ").replace(b":", b" ").split()[-1])
- except:
- if b"rs" in line:
-
- self._gcode_position = int(line.split()[1])
- def _setFirmwareName(self, name):
- new_name = re.findall(r"FIRMWARE_NAME:(.*);", str(name))
- if new_name:
- self._firmware_name = new_name[0]
- Logger.log("i", "USB output device Firmware name: %s", self._firmware_name)
- else:
- self._firmware_name = "Unknown"
- Logger.log("i", "Unknown USB output device Firmware name: %s", str(name))
- def getFirmwareName(self):
- return self._firmware_name
- def pausePrint(self):
- self._paused = True
- def resumePrint(self):
- self._paused = False
- self._sendNextGcodeLine()
- def cancelPrint(self):
- self._gcode_position = 0
- self._gcode.clear()
- self._printers[0].updateActivePrintJob(None)
- self._is_printing = False
- self._paused = False
-
- self._sendCommand("M140 S0")
- self._sendCommand("M104 S0")
- self._sendCommand("M107")
-
-
- self.printers[0].homeHead()
- self._sendCommand("M84")
- def _sendNextGcodeLine(self):
- if self._gcode_position >= len(self._gcode):
- self._printers[0].updateActivePrintJob(None)
- self._is_printing = False
- return
- line = self._gcode[self._gcode_position]
- if ";" in line:
- line = line[:line.find(";")]
- line = line.strip()
-
-
- if line == "" or line == "M0" or line == "M1":
- line = "M105"
- checksum = functools.reduce(lambda x, y: x ^ y, map(ord, "N%d%s" % (self._gcode_position, line)))
- self._sendCommand("N%d%s*%d" % (self._gcode_position, line, checksum))
- progress = (self._gcode_position / len(self._gcode))
- elapsed_time = int(time() - self._print_start_time)
- print_job = self._printers[0].activePrintJob
- if print_job is None:
- controller = GenericOutputController(self)
- controller.setCanUpdateFirmware(True)
- print_job = PrintJobOutputModel(output_controller=controller, name=CuraApplication.getInstance().getPrintInformation().jobName)
- print_job.updateState("printing")
- self._printers[0].updateActivePrintJob(print_job)
- print_job.updateTimeElapsed(elapsed_time)
- estimated_time = self._print_estimated_time
- if progress > .1:
- estimated_time = self._print_estimated_time * (1 - progress) + elapsed_time
- print_job.updateTimeTotal(estimated_time)
- self._gcode_position += 1
|