CloudApiClient.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. # Copyright (c) 2022 Ultimaker B.V.
  2. # Cura is released under the terms of the LGPLv3 or higher.
  3. from UM.Logger import Logger
  4. from UM.TaskManagement.HttpRequestManager import HttpRequestManager
  5. from UM.TaskManagement.HttpRequestScope import JsonDecoratorScope
  6. from cura.CuraApplication import CuraApplication
  7. from cura.UltimakerCloud.UltimakerCloudScope import UltimakerCloudScope
  8. from ..CloudApiModel import CloudApiModel
  9. class CloudApiClient:
  10. """Manages Cloud subscriptions
  11. When a package is added to a user's account, the user is 'subscribed' to that package.
  12. Whenever the user logs in on another instance of Cura, these subscriptions can be used to sync the user's plugins
  13. Singleton: use CloudApiClient.getInstance() instead of CloudApiClient()
  14. """
  15. __instance = None
  16. @classmethod
  17. def getInstance(cls, app: CuraApplication):
  18. if not cls.__instance:
  19. cls.__instance = CloudApiClient(app)
  20. return cls.__instance
  21. def __init__(self, app: CuraApplication) -> None:
  22. if self.__instance is not None:
  23. raise RuntimeError("This is a Singleton. use getInstance()")
  24. self._scope: JsonDecoratorScope = JsonDecoratorScope(UltimakerCloudScope(app))
  25. app.getPackageManager().packageInstalled.connect(self._onPackageInstalled)
  26. def unsubscribe(self, package_id: str) -> None:
  27. url = CloudApiModel.userPackageUrl(package_id)
  28. HttpRequestManager.getInstance().delete(url = url, scope = self._scope)
  29. def _subscribe(self, package_id: str) -> None:
  30. """You probably don't want to use this directly. All installed packages will be automatically subscribed."""
  31. Logger.debug("Subscribing to {}", package_id)
  32. data = "{\"data\": {\"package_id\": \"%s\", \"sdk_version\": \"%s\"}}" % (package_id, CloudApiModel.sdk_version)
  33. HttpRequestManager.getInstance().put(
  34. url = CloudApiModel.api_url_user_packages,
  35. data = data.encode(),
  36. scope = self._scope
  37. )
  38. def _onPackageInstalled(self, package_id: str):
  39. if CuraApplication.getInstance().getCuraAPI().account.isLoggedIn:
  40. # We might already be subscribed, but checking would take one extra request. Instead, simply subscribe
  41. self._subscribe(package_id)