publish.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # -*- test-case-name: twisted.spread.test.test_pb -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Persistently cached objects for PB.
  6. Maintainer: Glyph Lefkowitz
  7. Future Plans: None known.
  8. """
  9. import time
  10. from twisted.internet import defer
  11. from twisted.spread import banana, flavors, jelly
  12. class Publishable(flavors.Cacheable):
  13. """An object whose cached state persists across sessions."""
  14. def __init__(self, publishedID):
  15. self.republish()
  16. self.publishedID = publishedID
  17. def republish(self):
  18. """Set the timestamp to current and (TODO) update all observers."""
  19. self.timestamp = time.time()
  20. def view_getStateToPublish(self, perspective):
  21. "(internal)"
  22. return self.getStateToPublishFor(perspective)
  23. def getStateToPublishFor(self, perspective):
  24. """Implement me to special-case your state for a perspective."""
  25. return self.getStateToPublish()
  26. def getStateToPublish(self):
  27. """Implement me to return state to copy as part of the publish phase."""
  28. raise NotImplementedError("%s.getStateToPublishFor" % self.__class__)
  29. def getStateToCacheAndObserveFor(self, perspective, observer):
  30. """Get all necessary metadata to keep a clientside cache."""
  31. if perspective:
  32. pname = perspective.perspectiveName
  33. sname = perspective.getService().serviceName
  34. else:
  35. pname = "None"
  36. sname = "None"
  37. return {
  38. "remote": flavors.ViewPoint(perspective, self),
  39. "publishedID": self.publishedID,
  40. "perspective": pname,
  41. "service": sname,
  42. "timestamp": self.timestamp,
  43. }
  44. class RemotePublished(flavors.RemoteCache):
  45. """The local representation of remote Publishable object."""
  46. isActivated = 0
  47. _wasCleanWhenLoaded = 0
  48. def getFileName(self, ext="pub"):
  49. return "{}-{}-{}.{}".format(
  50. self.service,
  51. self.perspective,
  52. str(self.publishedID),
  53. ext,
  54. )
  55. def setCopyableState(self, state):
  56. self.__dict__.update(state)
  57. self._activationListeners = []
  58. try:
  59. with open(self.getFileName(), "rb") as dataFile:
  60. data = dataFile.read()
  61. except OSError:
  62. recent = 0
  63. else:
  64. newself = jelly.unjelly(banana.decode(data))
  65. recent = newself.timestamp == self.timestamp
  66. if recent:
  67. self._cbGotUpdate(newself.__dict__)
  68. self._wasCleanWhenLoaded = 1
  69. else:
  70. self.remote.callRemote("getStateToPublish").addCallbacks(self._cbGotUpdate)
  71. def __getstate__(self):
  72. other = self.__dict__.copy()
  73. # Remove PB-specific attributes
  74. del other["broker"]
  75. del other["remote"]
  76. del other["luid"]
  77. # remove my own runtime-tracking stuff
  78. del other["_activationListeners"]
  79. del other["isActivated"]
  80. return other
  81. def _cbGotUpdate(self, newState):
  82. self.__dict__.update(newState)
  83. self.isActivated = 1
  84. # send out notifications
  85. for listener in self._activationListeners:
  86. listener(self)
  87. self._activationListeners = []
  88. self.activated()
  89. with open(self.getFileName(), "wb") as dataFile:
  90. dataFile.write(banana.encode(jelly.jelly(self)))
  91. def activated(self):
  92. """Implement this method if you want to be notified when your
  93. publishable subclass is activated.
  94. """
  95. def callWhenActivated(self, callback):
  96. """Externally register for notification when this publishable has received all relevant data."""
  97. if self.isActivated:
  98. callback(self)
  99. else:
  100. self._activationListeners.append(callback)
  101. def whenReady(d):
  102. """
  103. Wrap a deferred returned from a pb method in another deferred that
  104. expects a RemotePublished as a result. This will allow you to wait until
  105. the result is really available.
  106. Idiomatic usage would look like::
  107. publish.whenReady(serverObject.getMeAPublishable()).addCallback(lookAtThePublishable)
  108. """
  109. d2 = defer.Deferred()
  110. d.addCallbacks(_pubReady, d2.errback, callbackArgs=(d2,))
  111. return d2
  112. def _pubReady(result, d2):
  113. "(internal)"
  114. result.callWhenActivated(d2.callback)