USBPrinterOutputDeviceManager.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. # Copyright (c) 2018 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. import threading
  4. import time
  5. import serial.tools.list_ports
  6. from os import environ
  7. from re import search
  8. from PyQt5.QtCore import QObject, pyqtSignal
  9. from UM.Signal import Signal, signalemitter
  10. from UM.OutputDevice.OutputDevicePlugin import OutputDevicePlugin
  11. from UM.i18n import i18nCatalog
  12. from cura.PrinterOutput.PrinterOutputDevice import ConnectionState
  13. from . import USBPrinterOutputDevice
  14. i18n_catalog = i18nCatalog("cura")
  15. ## Manager class that ensures that an USBPrinterOutput device is created for every connected USB printer.
  16. @signalemitter
  17. class USBPrinterOutputDeviceManager(QObject, OutputDevicePlugin):
  18. addUSBOutputDeviceSignal = Signal()
  19. progressChanged = pyqtSignal()
  20. def __init__(self, application, parent = None):
  21. if USBPrinterOutputDeviceManager.__instance is not None:
  22. raise RuntimeError("Try to create singleton '%s' more than once" % self.__class__.__name__)
  23. USBPrinterOutputDeviceManager.__instance = self
  24. super().__init__(parent = parent)
  25. self._application = application
  26. self._serial_port_list = []
  27. self._usb_output_devices = {}
  28. self._usb_output_devices_model = None
  29. self._update_thread = threading.Thread(target = self._updateThread)
  30. self._update_thread.setDaemon(True)
  31. self._check_updates = True
  32. self._application.applicationShuttingDown.connect(self.stop)
  33. # Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
  34. self.addUSBOutputDeviceSignal.connect(self.addOutputDevice)
  35. self._application.globalContainerStackChanged.connect(self.updateUSBPrinterOutputDevices)
  36. # The method updates/reset the USB settings for all connected USB devices
  37. def updateUSBPrinterOutputDevices(self):
  38. for device in self._usb_output_devices.values():
  39. if isinstance(device, USBPrinterOutputDevice.USBPrinterOutputDevice):
  40. device.resetDeviceSettings()
  41. def start(self):
  42. self._check_updates = True
  43. self._update_thread.start()
  44. def stop(self, store_data: bool = True):
  45. self._check_updates = False
  46. def _onConnectionStateChanged(self, serial_port):
  47. if serial_port not in self._usb_output_devices:
  48. return
  49. changed_device = self._usb_output_devices[serial_port]
  50. if changed_device.connectionState == ConnectionState.Connected:
  51. self.getOutputDeviceManager().addOutputDevice(changed_device)
  52. else:
  53. self.getOutputDeviceManager().removeOutputDevice(serial_port)
  54. def _updateThread(self):
  55. while self._check_updates:
  56. container_stack = self._application.getGlobalContainerStack()
  57. if container_stack is None:
  58. time.sleep(5)
  59. continue
  60. port_list = [] # Just an empty list; all USB devices will be removed.
  61. if container_stack.getMetaDataEntry("supports_usb_connection"):
  62. machine_file_formats = [file_type.strip() for file_type in container_stack.getMetaDataEntry("file_formats").split(";")]
  63. if "text/x-gcode" in machine_file_formats:
  64. port_list = self.getSerialPortList(only_list_usb=True)
  65. self._addRemovePorts(port_list)
  66. time.sleep(5)
  67. ## Helper to identify serial ports (and scan for them)
  68. def _addRemovePorts(self, serial_ports):
  69. # First, find and add all new or changed keys
  70. for serial_port in list(serial_ports):
  71. if serial_port not in self._serial_port_list:
  72. self.addUSBOutputDeviceSignal.emit(serial_port) # Hack to ensure its created in main thread
  73. continue
  74. self._serial_port_list = list(serial_ports)
  75. for port, device in self._usb_output_devices.items():
  76. if port not in self._serial_port_list:
  77. device.close()
  78. ## Because the model needs to be created in the same thread as the QMLEngine, we use a signal.
  79. def addOutputDevice(self, serial_port):
  80. device = USBPrinterOutputDevice.USBPrinterOutputDevice(serial_port)
  81. device.connectionStateChanged.connect(self._onConnectionStateChanged)
  82. self._usb_output_devices[serial_port] = device
  83. device.connect()
  84. ## Create a list of serial ports on the system.
  85. # \param only_list_usb If true, only usb ports are listed
  86. def getSerialPortList(self, only_list_usb = False):
  87. base_list = []
  88. for port in serial.tools.list_ports.comports():
  89. if not isinstance(port, tuple):
  90. port = (port.device, port.description, port.hwid)
  91. if only_list_usb and not port[2].startswith("USB"):
  92. continue
  93. # To prevent cura from messing with serial ports of other devices,
  94. # filter by regular expressions passed in as environment variables.
  95. # Get possible patterns with python3 -m serial.tools.list_ports -v
  96. # set CURA_DEVICENAMES=USB[1-9] -> e.g. not matching /dev/ttyUSB0
  97. pattern = environ.get('CURA_DEVICENAMES')
  98. if pattern and not search(pattern, port[0]):
  99. continue
  100. # set CURA_DEVICETYPES=CP2102 -> match a type of serial converter
  101. pattern = environ.get('CURA_DEVICETYPES')
  102. if pattern and not search(pattern, port[1]):
  103. continue
  104. # set CURA_DEVICEINFOS=LOCATION=2-1.4 -> match a physical port
  105. # set CURA_DEVICEINFOS=VID:PID=10C4:EA60 -> match a vendor:product
  106. pattern = environ.get('CURA_DEVICEINFOS')
  107. if pattern and not search(pattern, port[2]):
  108. continue
  109. base_list += [port[0]]
  110. return list(base_list)
  111. __instance = None # type: USBPrinterOutputDeviceManager
  112. @classmethod
  113. def getInstance(cls, *args, **kwargs) -> "USBPrinterOutputDeviceManager":
  114. return cls.__instance