CloudApiClient.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from UM.Logger import Logger
  2. from UM.TaskManagement.HttpRequestManager import HttpRequestManager
  3. from UM.TaskManagement.HttpRequestScope import JsonDecoratorScope
  4. from cura.CuraApplication import CuraApplication
  5. from cura.UltimakerCloud.UltimakerCloudScope import UltimakerCloudScope
  6. from ..CloudApiModel import CloudApiModel
  7. class CloudApiClient:
  8. """Manages Cloud subscriptions
  9. When a package is added to a user's account, the user is 'subscribed' to that package.
  10. Whenever the user logs in on another instance of Cura, these subscriptions can be used to sync the user's plugins
  11. Singleton: use CloudApiClient.getInstance() instead of CloudApiClient()
  12. """
  13. __instance = None
  14. @classmethod
  15. def getInstance(cls, app: CuraApplication):
  16. if not cls.__instance:
  17. cls.__instance = CloudApiClient(app)
  18. return cls.__instance
  19. def __init__(self, app: CuraApplication) -> None:
  20. if self.__instance is not None:
  21. raise RuntimeError("This is a Singleton. use getInstance()")
  22. self._scope = JsonDecoratorScope(UltimakerCloudScope(app)) # type: JsonDecoratorScope
  23. app.getPackageManager().packageInstalled.connect(self._onPackageInstalled)
  24. def unsubscribe(self, package_id: str) -> None:
  25. url = CloudApiModel.userPackageUrl(package_id)
  26. HttpRequestManager.getInstance().delete(url = url, scope = self._scope)
  27. def _subscribe(self, package_id: str) -> None:
  28. """You probably don't want to use this directly. All installed packages will be automatically subscribed."""
  29. Logger.debug("Subscribing to {}", package_id)
  30. data = "{\"data\": {\"package_id\": \"%s\", \"sdk_version\": \"%s\"}}" % (package_id, CloudApiModel.sdk_version)
  31. HttpRequestManager.getInstance().put(
  32. url = CloudApiModel.api_url_user_packages,
  33. data = data.encode(),
  34. scope = self._scope
  35. )
  36. def _onPackageInstalled(self, package_id: str):
  37. if CuraApplication.getInstance().getCuraAPI().account.isLoggedIn:
  38. # We might already be subscribed, but checking would take one extra request. Instead, simply subscribe
  39. self._subscribe(package_id)