async_semaphore.cpp 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. #include "async_semaphore.h"
  2. #include <util/system/guard.h>
  3. #include <util/system/yassert.h>
  4. #include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
  5. namespace NThreading {
  6. TAsyncSemaphore::TAsyncSemaphore(size_t count)
  7. : Count_(count)
  8. {
  9. Y_ASSERT(count > 0);
  10. }
  11. TAsyncSemaphore::TPtr TAsyncSemaphore::Make(size_t count) {
  12. return TPtr(new TAsyncSemaphore(count));
  13. }
  14. TFuture<TAsyncSemaphore::TPtr> TAsyncSemaphore::AcquireAsync() {
  15. with_lock(Lock_) {
  16. if (Cancelled_) {
  17. return MakeErrorFuture<TPtr>(
  18. std::make_exception_ptr(TOperationCancelledException()));
  19. }
  20. if (Count_) {
  21. --Count_;
  22. return MakeFuture<TAsyncSemaphore::TPtr>(this);
  23. }
  24. auto promise = NewPromise<TAsyncSemaphore::TPtr>();
  25. Promises_.push_back(promise);
  26. return promise.GetFuture();
  27. }
  28. }
  29. void TAsyncSemaphore::Release() {
  30. TPromise<TPtr> promise;
  31. with_lock(Lock_) {
  32. if (Cancelled_) {
  33. return;
  34. }
  35. if (Promises_.empty()) {
  36. ++Count_;
  37. return;
  38. } else {
  39. promise = Promises_.front();
  40. Promises_.pop_front();
  41. }
  42. }
  43. promise.SetValue(this);
  44. }
  45. void TAsyncSemaphore::Cancel() {
  46. std::list<TPromise<TPtr>> promises;
  47. with_lock(Lock_) {
  48. Cancelled_ = true;
  49. std::swap(Promises_, promises);
  50. }
  51. for (auto& p: promises) {
  52. p.SetException(std::make_exception_ptr(TOperationCancelledException()));
  53. }
  54. }
  55. TAsyncSemaphore::TAutoRelease::~TAutoRelease() {
  56. if (Sem) {
  57. Sem->Release();
  58. }
  59. }
  60. std::function<void (const TFuture<void>&)> TAsyncSemaphore::TAutoRelease::DeferRelease() {
  61. return [s = std::move(this->Sem)](const TFuture<void>&) {
  62. s->Release();
  63. };
  64. }
  65. }