rwlock_ut.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #include <library/cpp/threading/light_rw_lock/lightrwlock.h>
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <util/generic/ptr.h>
  4. #include <util/random/random.h>
  5. #include <util/thread/pool.h>
  6. class TRWMutexTest: public TTestBase {
  7. UNIT_TEST_SUITE(TRWMutexTest);
  8. UNIT_TEST(TestConcurrentReadAccess)
  9. UNIT_TEST(TestExclusiveWriteAccess)
  10. UNIT_TEST(TestSharedData)
  11. UNIT_TEST_SUITE_END();
  12. class TOneShotEvent {
  13. public:
  14. void Wait() {
  15. Released_.wait(false, std::memory_order_acquire);
  16. }
  17. void Release() {
  18. Released_.store(true, std::memory_order_release);
  19. Released_.notify_all();
  20. }
  21. private:
  22. std::atomic<bool> Released_{false};
  23. };
  24. struct TSharedData {
  25. TSharedData()
  26. : WritersIn(0)
  27. , ReadersIn(0)
  28. , Counter(0)
  29. {
  30. }
  31. std::atomic<ui32> WritersIn;
  32. std::atomic<ui32> ReadersIn;
  33. void IncWriters() {
  34. WritersIn.fetch_add(1, std::memory_order_relaxed);
  35. }
  36. void DecWriters() {
  37. WritersIn.fetch_sub(1, std::memory_order_relaxed);
  38. }
  39. ui32 LoadWriters() {
  40. return WritersIn.load(std::memory_order_relaxed);
  41. }
  42. void IncReaders() {
  43. ReadersIn.fetch_add(1, std::memory_order_relaxed);
  44. }
  45. void DecReaders() {
  46. ReadersIn.fetch_sub(1, std::memory_order_relaxed);
  47. }
  48. ui32 LoadReaders() {
  49. return ReadersIn.load(std::memory_order_relaxed);
  50. }
  51. std::atomic_flag Failed = ATOMIC_FLAG_INIT;
  52. void SetFailed() {
  53. Failed.test_and_set(std::memory_order_relaxed);
  54. }
  55. bool TestFailed() {
  56. return Failed.test(std::memory_order_relaxed);
  57. }
  58. ui64 Counter;
  59. TLightRWLock Mutex;
  60. TOneShotEvent Event;
  61. };
  62. class TThreadTask: public IObjectInQueue {
  63. public:
  64. using PFunc = void (TThreadTask::*)(void);
  65. TThreadTask(PFunc func, TSharedData& data, size_t id, size_t total)
  66. : Func_(func)
  67. , Data_(data)
  68. , Id_(id)
  69. , Total_(total)
  70. {
  71. }
  72. void Process(void*) override {
  73. THolder<TThreadTask> This(this);
  74. (this->*Func_)();
  75. }
  76. #define FAIL_ASSERT(cond) \
  77. if (!(cond)) { \
  78. Data_.SetFailed(); \
  79. }
  80. void RunConcurrentReadAccess() {
  81. Data_.Mutex.AcquireRead();
  82. Data_.IncReaders();
  83. if (Data_.LoadReaders() != Total_) {
  84. Data_.Event.Wait();
  85. }
  86. Data_.Event.Release();
  87. Data_.DecReaders();
  88. Data_.Mutex.ReleaseRead();
  89. }
  90. void RunExclusiveWriteAccess() {
  91. if (Id_ % 2 == 0) {
  92. for (size_t i = 0; i < 10; ++i) {
  93. Data_.Mutex.AcquireRead();
  94. Data_.IncReaders();
  95. FAIL_ASSERT(Data_.LoadWriters() == 0);
  96. usleep(RandomNumber<ui32>() % 5);
  97. Data_.DecReaders();
  98. Data_.Mutex.ReleaseRead();
  99. }
  100. } else {
  101. for (size_t i = 0; i < 10; ++i) {
  102. Data_.Mutex.AcquireWrite();
  103. Data_.IncWriters();
  104. FAIL_ASSERT(Data_.LoadReaders() == 0 && Data_.LoadWriters() == 1);
  105. usleep(RandomNumber<ui32>() % 5);
  106. Data_.DecWriters();
  107. Data_.Mutex.ReleaseWrite();
  108. }
  109. }
  110. }
  111. void RunSharedData() {
  112. if (Id_ % 2 == 0) {
  113. ui64 localCounter = 0;
  114. Y_UNUSED(localCounter);
  115. for (size_t i = 0; i < 1000; ++i) {
  116. Data_.Mutex.AcquireRead();
  117. localCounter = Data_.Counter;
  118. Data_.Mutex.ReleaseRead();
  119. }
  120. } else {
  121. for (size_t i = 0; i < 1000; ++i) {
  122. Data_.Mutex.AcquireWrite();
  123. ++Data_.Counter;
  124. Data_.Mutex.ReleaseWrite();
  125. }
  126. }
  127. }
  128. #undef FAIL_ASSERT
  129. private:
  130. PFunc Func_;
  131. TSharedData& Data_;
  132. size_t Id_;
  133. size_t Total_;
  134. };
  135. private:
  136. #define RUN_CYCLE(what, count) \
  137. Data_.Reset(MakeHolder<TSharedData>()); \
  138. Q_.Start(count); \
  139. for (size_t i = 0; i < count; ++i) { \
  140. UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, *Data_, i, count))); \
  141. } \
  142. Q_.Stop(); \
  143. UNIT_ASSERT(!Data_->TestFailed());
  144. void TestConcurrentReadAccess() {
  145. RUN_CYCLE(RunConcurrentReadAccess, 5);
  146. }
  147. void TestExclusiveWriteAccess() {
  148. RUN_CYCLE(RunExclusiveWriteAccess, 4);
  149. }
  150. void TestSharedData() {
  151. // TODO: Fix Tsan error
  152. // RUN_CYCLE(RunSharedData, 4);
  153. }
  154. #undef RUN_CYCLE
  155. private:
  156. THolder<TSharedData> Data_;
  157. TThreadPool Q_;
  158. };
  159. UNIT_TEST_SUITE_REGISTRATION(TRWMutexTest)