ConnectionStatus.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from typing import Optional
  2. from PyQt5.QtCore import QObject, pyqtSignal, QTimer, pyqtProperty
  3. from PyQt5.QtNetwork import QNetworkReply
  4. from UM.TaskManagement.HttpRequestManager import HttpRequestManager
  5. from cura.UltimakerCloud import UltimakerCloudAuthentication
  6. class ConnectionStatus(QObject):
  7. """Status info for some web services"""
  8. UPDATE_INTERVAL = 10.0 # seconds
  9. ULTIMAKER_CLOUD_STATUS_URL = UltimakerCloudAuthentication.CuraCloudAPIRoot + "/connect/v1/"
  10. __instance = None # type: Optional[ConnectionStatus]
  11. internetReachableChanged = pyqtSignal()
  12. umCloudReachableChanged = pyqtSignal()
  13. @classmethod
  14. def getInstance(cls, *args, **kwargs) -> "ConnectionStatus":
  15. if cls.__instance is None:
  16. cls.__instance = cls(*args, **kwargs)
  17. return cls.__instance
  18. def __init__(self, parent: Optional["QObject"] = None):
  19. super().__init__(parent)
  20. self._http = HttpRequestManager.getInstance()
  21. self._statuses = {
  22. self.ULTIMAKER_CLOUD_STATUS_URL: True,
  23. "http://example.com": True
  24. }
  25. # Create a timer for automatic updates
  26. self._update_timer = QTimer()
  27. self._update_timer.setInterval(int(self.UPDATE_INTERVAL * 1000))
  28. # The timer is restarted automatically
  29. self._update_timer.setSingleShot(False)
  30. self._update_timer.timeout.connect(self._update)
  31. self._update_timer.start()
  32. @pyqtProperty(bool, notify=internetReachableChanged)
  33. def isInternetReachable(self) -> bool:
  34. # Is any of the test urls reachable?
  35. return any(self._statuses.values())
  36. def _update(self):
  37. for url in self._statuses.keys():
  38. self._http.get(
  39. url = url,
  40. callback = self._statusCallback,
  41. error_callback = self._statusCallback,
  42. timeout = 5
  43. )
  44. def _statusCallback(self, reply: QNetworkReply, error: QNetworkReply.NetworkError = None):
  45. url = reply.request().url().toString()
  46. prev_statuses = self._statuses.copy()
  47. self._statuses[url] = HttpRequestManager.replyIndicatesSuccess(reply, error)
  48. if any(self._statuses.values()) != any(prev_statuses.values()):
  49. self.internetReachableChanged.emit()