remote_server_session_semaphore.cpp 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. #include "remote_server_session_semaphore.h"
  2. #include <util/stream/output.h>
  3. #include <util/system/yassert.h>
  4. using namespace NBus;
  5. using namespace NBus::NPrivate;
  6. TRemoteServerSessionSemaphore::TRemoteServerSessionSemaphore(
  7. TAtomicBase limitCount, TAtomicBase limitSize, const char* name)
  8. : Name(name)
  9. , LimitCount(limitCount)
  10. , LimitSize(limitSize)
  11. , CurrentCount(0)
  12. , CurrentSize(0)
  13. , PausedByUser(0)
  14. , StopSignal(0)
  15. {
  16. Y_ABORT_UNLESS(limitCount > 0, "limit must be > 0");
  17. Y_UNUSED(Name);
  18. }
  19. TRemoteServerSessionSemaphore::~TRemoteServerSessionSemaphore() {
  20. Y_ABORT_UNLESS(AtomicGet(CurrentCount) == 0);
  21. // TODO: fix spider and enable
  22. //Y_ABORT_UNLESS(AtomicGet(CurrentSize) == 0);
  23. }
  24. bool TRemoteServerSessionSemaphore::TryWait() {
  25. if (Y_UNLIKELY(AtomicGet(StopSignal)))
  26. return true;
  27. if (AtomicGet(PausedByUser))
  28. return false;
  29. if (AtomicGet(CurrentCount) < LimitCount && (LimitSize < 0 || AtomicGet(CurrentSize) < LimitSize))
  30. return true;
  31. return false;
  32. }
  33. void TRemoteServerSessionSemaphore::IncrementMultiple(TAtomicBase count, TAtomicBase size) {
  34. AtomicAdd(CurrentCount, count);
  35. AtomicAdd(CurrentSize, size);
  36. Updated();
  37. }
  38. void TRemoteServerSessionSemaphore::ReleaseMultiple(TAtomicBase count, TAtomicBase size) {
  39. AtomicSub(CurrentCount, count);
  40. AtomicSub(CurrentSize, size);
  41. Updated();
  42. }
  43. void TRemoteServerSessionSemaphore::Stop() {
  44. AtomicSet(StopSignal, 1);
  45. Updated();
  46. }
  47. void TRemoteServerSessionSemaphore::PauseByUsed(bool pause) {
  48. AtomicSet(PausedByUser, pause);
  49. Updated();
  50. }