async_semaphore_ut.cpp 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #include "async_semaphore.h"
  2. #include "async.h"
  3. #include <library/cpp/testing/unittest/registar.h>
  4. #include <library/cpp/threading/cancellation/operation_cancelled_exception.h>
  5. #include <util/generic/scope.h>
  6. #include <util/generic/vector.h>
  7. #include <util/thread/pool.h>
  8. using namespace NThreading;
  9. Y_UNIT_TEST_SUITE(TSemaphoreAsync) {
  10. Y_UNIT_TEST(SimplyAquired) {
  11. const size_t MAX_IN_PROGRESS = 5;
  12. TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
  13. pool.Start(MAX_IN_PROGRESS * 2);
  14. TVector<TFuture<size_t>> futures;
  15. auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
  16. for (size_t i = 0; i < 100; ++i) {
  17. auto f = semaphore->AcquireAsync()
  18. .Apply([&pool, i](const auto& f) -> TFuture<size_t> {
  19. return Async([i, semaphore = f.GetValue()] {
  20. auto guard = semaphore->MakeAutoRelease();
  21. Sleep(TDuration::MilliSeconds(100));
  22. return i;
  23. }, pool);
  24. });
  25. futures.push_back(f);
  26. }
  27. for (size_t i = 0; i < 100; ++i) {
  28. UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
  29. }
  30. }
  31. Y_UNIT_TEST(AutoReleasedOnException) {
  32. auto semaphore = TAsyncSemaphore::Make(1);
  33. auto lock = semaphore->AcquireAsync();
  34. UNIT_ASSERT(lock.HasValue());
  35. auto waitingLock = semaphore->AcquireAsync();
  36. UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());
  37. auto future = lock.Apply([](const auto& f) {
  38. auto guard = f.GetValue()->MakeAutoRelease();
  39. ythrow yexception() << "oops";
  40. });
  41. UNIT_ASSERT(future.HasException());
  42. UNIT_ASSERT(waitingLock.HasValue());
  43. }
  44. Y_UNIT_TEST(LimitsParallelism) {
  45. const size_t MAX_IN_PROGRESS = 5;
  46. TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
  47. pool.Start(MAX_IN_PROGRESS * 2);
  48. std::atomic_uint64_t inProgress = 0;
  49. TVector<TFuture<size_t>> futures;
  50. auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
  51. for (size_t i = 0; i < 100; ++i) {
  52. auto f = semaphore->AcquireAsync()
  53. .Apply([&, i](const auto&) -> TFuture<size_t> {
  54. auto currentInProgress = inProgress.fetch_add(1) + 1;
  55. UNIT_ASSERT_GT(currentInProgress, 0);
  56. UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);
  57. return Async([i] {
  58. Sleep(TDuration::MilliSeconds(100));
  59. return i;
  60. }, pool);
  61. });
  62. f.IgnoreResult().Subscribe([&](const auto&) {
  63. auto currentInProgress = inProgress.fetch_sub(1) - 1;
  64. UNIT_ASSERT_GE(currentInProgress, 0);
  65. UNIT_ASSERT_LE(currentInProgress, MAX_IN_PROGRESS);
  66. semaphore->Release();
  67. });
  68. futures.push_back(f);
  69. }
  70. WaitAll(futures).Wait();
  71. UNIT_ASSERT_EQUAL(inProgress.load(), 0);
  72. }
  73. Y_UNIT_TEST(AcquisitionOrder) {
  74. const size_t MAX_IN_PROGRESS = 5;
  75. TSimpleThreadPool pool(TThreadPool::TParams().SetCatching(false));
  76. pool.Start(MAX_IN_PROGRESS * 2);
  77. std::atomic_size_t latestId = 0;
  78. TVector<TFuture<size_t>> futures;
  79. auto semaphore = TAsyncSemaphore::Make(MAX_IN_PROGRESS);
  80. for (size_t i = 0; i < 100; ++i) {
  81. auto f = semaphore->AcquireAsync()
  82. .Apply([&](const auto& f) -> size_t {
  83. auto guard = f.GetValue()->MakeAutoRelease();
  84. auto currentId = latestId.fetch_add(1);
  85. return currentId;
  86. });
  87. futures.push_back(f);
  88. }
  89. for (size_t i = 0; i < 100; ++i) {
  90. UNIT_ASSERT_VALUES_EQUAL(futures[i].GetValueSync(), i);
  91. }
  92. }
  93. Y_UNIT_TEST(Cancel) {
  94. auto semaphore = TAsyncSemaphore::Make(1);
  95. auto firstLock = semaphore->AcquireAsync();
  96. auto canceledLock = semaphore->AcquireAsync();
  97. UNIT_ASSERT(firstLock.HasValue());
  98. UNIT_ASSERT(!canceledLock.HasValue());
  99. UNIT_ASSERT(!canceledLock.HasException());
  100. semaphore->Cancel();
  101. UNIT_ASSERT_EXCEPTION(canceledLock.TryRethrow(), TOperationCancelledException);
  102. UNIT_ASSERT_NO_EXCEPTION(firstLock.GetValue()->Release());
  103. }
  104. Y_UNIT_TEST(AcquireAfterCancel) {
  105. auto semaphore = TAsyncSemaphore::Make(1);
  106. semaphore->Cancel();
  107. auto lock = semaphore->AcquireAsync();
  108. UNIT_ASSERT_EXCEPTION(lock.TryRethrow(), TOperationCancelledException);
  109. }
  110. Y_UNIT_TEST(AutoReleaseDeferReleaseReleasesOnException) {
  111. auto semaphore = TAsyncSemaphore::Make(1);
  112. auto lock = semaphore->AcquireAsync();
  113. UNIT_ASSERT(lock.HasValue());
  114. auto waitingLock = semaphore->AcquireAsync();
  115. UNIT_ASSERT(!waitingLock.HasValue() && !waitingLock.HasException());
  116. auto asyncWork = lock.Apply([](const auto& lock) {
  117. lock.TryRethrow();
  118. ythrow yexception() << "oops";
  119. });
  120. asyncWork.Subscribe(semaphore->MakeAutoRelease().DeferRelease());
  121. UNIT_ASSERT(asyncWork.HasException());
  122. UNIT_ASSERT(waitingLock.HasValue());
  123. }
  124. Y_UNIT_TEST(AutoReleaseNotCopyable) {
  125. UNIT_ASSERT(!std::is_copy_constructible_v<TAsyncSemaphore::TAutoRelease>);
  126. UNIT_ASSERT(!std::is_copy_assignable_v<TAsyncSemaphore::TAutoRelease>);
  127. }
  128. }