USBPrinterOutputDeviceManager.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # Copyright (c) 2020 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import threading
  4. import time
  5. import serial.tools.list_ports
  6. from os import environ
  7. from re import search
  8. from PyQt6.QtCore import QObject, pyqtSignal
  9. from UM.Platform import Platform
  10. from UM.Signal import Signal, signalemitter
  11. from UM.OutputDevice.OutputDevicePlugin import OutputDevicePlugin
  12. from UM.i18n import i18nCatalog
  13. from cura.PrinterOutput.PrinterOutputDevice import ConnectionState
  14. from . import USBPrinterOutputDevice
  15. i18n_catalog = i18nCatalog("cura")
  16. @signalemitter
  17. class USBPrinterOutputDeviceManager(QObject, OutputDevicePlugin):
  18. """Manager class that ensures that an USBPrinterOutput device is created for every connected USB printer."""
  19. addUSBOutputDeviceSignal = Signal()
  20. progressChanged = pyqtSignal()
  21. def __init__(self, application, parent = None):
  22. if USBPrinterOutputDeviceManager.__instance is not None:
  23. raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__)
  24. super().__init__(parent = parent)
  25. USBPrinterOutputDeviceManager.__instance = self
  26. self._application = application
  27. self._serial_port_list = []
  28. self._usb_output_devices = {}
  29. self._usb_output_devices_model = None
  30. self._update_thread = threading.Thread(target = self._updateThread)
  31. self._update_thread.daemon = True
  32. self._check_updates = True
  33. self._application.applicationShuttingDown.connect(self.stop)
  34. # Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
  35. self.addUSBOutputDeviceSignal.connect(self.addOutputDevice)
  36. self._application.globalContainerStackChanged.connect(self.updateUSBPrinterOutputDevices)
  37. # The method updates/reset the USB settings for all connected USB devices
  38. def updateUSBPrinterOutputDevices(self):
  39. for device in self._usb_output_devices.values():
  40. if isinstance(device, USBPrinterOutputDevice.USBPrinterOutputDevice):
  41. device.resetDeviceSettings()
  42. def start(self):
  43. self._check_updates = True
  44. self._update_thread.start()
  45. def stop(self, store_data: bool = True):
  46. self._check_updates = False
  47. def _onConnectionStateChanged(self, serial_port):
  48. if serial_port not in self._usb_output_devices:
  49. return
  50. changed_device = self._usb_output_devices[serial_port]
  51. if changed_device.connectionState == ConnectionState.Connected:
  52. self.getOutputDeviceManager().addOutputDevice(changed_device)
  53. else:
  54. self.getOutputDeviceManager().removeOutputDevice(serial_port)
  55. def _updateThread(self):
  56. while self._check_updates:
  57. container_stack = self._application.getGlobalContainerStack()
  58. if container_stack is None:
  59. time.sleep(5)
  60. continue
  61. port_list = [] # Just an empty list; all USB devices will be removed.
  62. if container_stack.getMetaDataEntry("supports_usb_connection"):
  63. machine_file_formats = [file_type.strip() for file_type in container_stack.getMetaDataEntry("file_formats").split(";")]
  64. if "text/x-gcode" in machine_file_formats:
  65. # We only limit listing usb on windows is a fix for connecting tty/cu printers on MacOS and Linux
  66. port_list = self.getSerialPortList(only_list_usb=Platform.isWindows())
  67. self._addRemovePorts(port_list)
  68. time.sleep(5)
  69. def _addRemovePorts(self, serial_ports):
  70. """Helper to identify serial ports (and scan for them)"""
  71. # First, find and add all new or changed keys
  72. for serial_port in list(serial_ports):
  73. if serial_port not in self._serial_port_list:
  74. self.addUSBOutputDeviceSignal.emit(serial_port) # Hack to ensure its created in main thread
  75. continue
  76. self._serial_port_list = list(serial_ports)
  77. for port, device in self._usb_output_devices.items():
  78. if port not in self._serial_port_list:
  79. device.close()
  80. def addOutputDevice(self, serial_port):
  81. """Because the model needs to be created in the same thread as the QMLEngine, we use a signal."""
  82. device = USBPrinterOutputDevice.USBPrinterOutputDevice(serial_port)
  83. device.connectionStateChanged.connect(self._onConnectionStateChanged)
  84. self._usb_output_devices[serial_port] = device
  85. device.connect()
  86. def getSerialPortList(self, only_list_usb = False):
  87. """Create a list of serial ports on the system.
  88. :param only_list_usb: If true, only usb ports are listed
  89. """
  90. base_list = []
  91. try:
  92. port_list = serial.tools.list_ports.comports()
  93. except TypeError: # Bug in PySerial causes a TypeError if port gets disconnected while processing.
  94. port_list = []
  95. for port in port_list:
  96. if not isinstance(port, tuple):
  97. port = (port.device, port.description, port.hwid)
  98. if not port[2]: # HWID may be None if the device is not USB or the system doesn't report the type.
  99. continue
  100. if only_list_usb and not port[2].startswith("USB"):
  101. continue
  102. # To prevent cura from messing with serial ports of other devices,
  103. # filter by regular expressions passed in as environment variables.
  104. # Get possible patterns with python3 -m serial.tools.list_ports -v
  105. # set CURA_DEVICENAMES=USB[1-9] -> e.g. not matching /dev/ttyUSB0
  106. pattern = environ.get('CURA_DEVICENAMES')
  107. if pattern and not search(pattern, port[0]):
  108. continue
  109. # set CURA_DEVICETYPES=CP2102 -> match a type of serial converter
  110. pattern = environ.get('CURA_DEVICETYPES')
  111. if pattern and not search(pattern, port[1]):
  112. continue
  113. # set CURA_DEVICEINFOS=LOCATION=2-1.4 -> match a physical port
  114. # set CURA_DEVICEINFOS=VID:PID=10C4:EA60 -> match a vendor:product
  115. pattern = environ.get('CURA_DEVICEINFOS')
  116. if pattern and not search(pattern, port[2]):
  117. continue
  118. base_list += [port[0]]
  119. return list(base_list)
  120. __instance = None # type: USBPrinterOutputDeviceManager
  121. @classmethod
  122. def getInstance(cls, *args, **kwargs) -> "USBPrinterOutputDeviceManager":
  123. return cls.__instance