pool_ut.cpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. #include "pool.h"
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <util/stream/output.h>
  4. #include <util/random/fast.h>
  5. #include <util/system/spinlock.h>
  6. #include <util/system/thread.h>
  7. #include <util/system/mutex.h>
  8. #include <util/system/condvar.h>
  9. struct TThreadPoolTest {
  10. TSpinLock Lock;
  11. long R = -1;
  12. struct TTask: public IObjectInQueue {
  13. TThreadPoolTest* Test = nullptr;
  14. long Value = 0;
  15. TTask(TThreadPoolTest* test, int value)
  16. : Test(test)
  17. , Value(value)
  18. {
  19. }
  20. void Process(void*) override {
  21. THolder<TTask> This(this);
  22. TGuard<TSpinLock> guard(Test->Lock);
  23. Test->R ^= Value;
  24. }
  25. };
  26. struct TOwnedTask: public IObjectInQueue {
  27. bool& Processed;
  28. bool& Destructed;
  29. TOwnedTask(bool& processed, bool& destructed)
  30. : Processed(processed)
  31. , Destructed(destructed)
  32. {
  33. }
  34. ~TOwnedTask() override {
  35. Destructed = true;
  36. }
  37. void Process(void*) override {
  38. Processed = true;
  39. }
  40. };
  41. inline void TestAnyQueue(IThreadPool* queue, size_t queueSize = 1000) {
  42. TReallyFastRng32 rand(17);
  43. const size_t cnt = 1000;
  44. R = 0;
  45. for (size_t i = 0; i < cnt; ++i) {
  46. R ^= (long)rand.GenRand();
  47. }
  48. queue->Start(10, queueSize);
  49. rand = TReallyFastRng32(17);
  50. for (size_t i = 0; i < cnt; ++i) {
  51. UNIT_ASSERT(queue->Add(new TTask(this, (long)rand.GenRand())));
  52. }
  53. queue->Stop();
  54. UNIT_ASSERT_EQUAL(0, R);
  55. }
  56. };
  57. class TFailAddQueue: public IThreadPool {
  58. public:
  59. bool Add(IObjectInQueue* /*obj*/) override Y_WARN_UNUSED_RESULT {
  60. return false;
  61. }
  62. void Start(size_t, size_t) override {
  63. }
  64. void Stop() noexcept override {
  65. }
  66. size_t Size() const noexcept override {
  67. return 0;
  68. }
  69. };
  70. Y_UNIT_TEST_SUITE(TThreadPoolTest) {
  71. Y_UNIT_TEST(TestTThreadPool) {
  72. TThreadPoolTest t;
  73. TThreadPool q;
  74. t.TestAnyQueue(&q);
  75. }
  76. Y_UNIT_TEST(TestTThreadPoolBlocking) {
  77. TThreadPoolTest t;
  78. TThreadPool q(TThreadPool::TParams().SetBlocking(true));
  79. t.TestAnyQueue(&q, 100);
  80. }
  81. // disabled by pg@ long time ago due to test flaps
  82. // Tried to enable: REVIEW:78772
  83. Y_UNIT_TEST(TestTAdaptiveThreadPool) {
  84. if (false) {
  85. TThreadPoolTest t;
  86. TAdaptiveThreadPool q;
  87. t.TestAnyQueue(&q);
  88. }
  89. }
  90. Y_UNIT_TEST(TestAddAndOwn) {
  91. TThreadPool q;
  92. q.Start(2);
  93. bool processed = false;
  94. bool destructed = false;
  95. q.SafeAddAndOwn(MakeHolder<TThreadPoolTest::TOwnedTask>(processed, destructed));
  96. q.Stop();
  97. UNIT_ASSERT_C(processed, "Not processed");
  98. UNIT_ASSERT_C(destructed, "Not destructed");
  99. }
  100. Y_UNIT_TEST(TestAddFunc) {
  101. TFailAddQueue queue;
  102. bool added = queue.AddFunc(
  103. []() {} // Lambda, I call him 'Lambda'!
  104. );
  105. UNIT_ASSERT_VALUES_EQUAL(added, false);
  106. }
  107. Y_UNIT_TEST(TestSafeAddFuncThrows) {
  108. TFailAddQueue queue;
  109. UNIT_CHECK_GENERATED_EXCEPTION(queue.SafeAddFunc([] {}), TThreadPoolException);
  110. }
  111. Y_UNIT_TEST(TestFunctionNotCopied) {
  112. struct TFailOnCopy {
  113. TFailOnCopy() {
  114. }
  115. TFailOnCopy(TFailOnCopy&&) {
  116. }
  117. TFailOnCopy(const TFailOnCopy&) {
  118. UNIT_FAIL("Don't copy std::function inside TThreadPool");
  119. }
  120. };
  121. TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true));
  122. queue.Start(2);
  123. queue.SafeAddFunc([data = TFailOnCopy()]() {});
  124. queue.Stop();
  125. }
  126. Y_UNIT_TEST(TestInfoGetters) {
  127. TThreadPool queue;
  128. queue.Start(2, 7);
  129. UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 2);
  130. UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 2);
  131. UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 7);
  132. queue.Stop();
  133. queue.Start(4, 1);
  134. UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 4);
  135. UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 4);
  136. UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 1);
  137. queue.Stop();
  138. }
  139. void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) {
  140. pool.Start(1);
  141. TString name;
  142. pool.SafeAddFunc([&name]() {
  143. name = TThread::CurrentThreadName();
  144. });
  145. pool.Stop();
  146. if (TThread::CanGetCurrentThreadName()) {
  147. UNIT_ASSERT_EQUAL(name, expectedName);
  148. UNIT_ASSERT_UNEQUAL(TThread::CurrentThreadName(), expectedName);
  149. }
  150. }
  151. Y_UNIT_TEST(TestFixedThreadName) {
  152. const TString expectedName = "HelloWorld";
  153. {
  154. TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadName(expectedName));
  155. TestFixedThreadName(pool, expectedName);
  156. }
  157. {
  158. TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadName(expectedName));
  159. TestFixedThreadName(pool, expectedName);
  160. }
  161. }
  162. void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) {
  163. pool.Start(expectedNames.size());
  164. TMutex lock;
  165. TCondVar allReady;
  166. size_t readyCount = 0;
  167. THashSet<TString> names;
  168. for (size_t i = 0; i < expectedNames.size(); ++i) {
  169. pool.SafeAddFunc([&]() {
  170. with_lock (lock) {
  171. if (++readyCount == expectedNames.size()) {
  172. allReady.BroadCast();
  173. } else {
  174. while (readyCount != expectedNames.size()) {
  175. allReady.WaitI(lock);
  176. }
  177. }
  178. names.insert(TThread::CurrentThreadName());
  179. }
  180. });
  181. }
  182. pool.Stop();
  183. if (TThread::CanGetCurrentThreadName()) {
  184. UNIT_ASSERT_EQUAL(names, expectedNames);
  185. }
  186. }
  187. Y_UNIT_TEST(TestEnumeratedThreadName) {
  188. const TString namePrefix = "HelloWorld";
  189. const THashSet<TString> expectedNames = {
  190. "HelloWorld0",
  191. "HelloWorld1",
  192. "HelloWorld2",
  193. "HelloWorld3",
  194. "HelloWorld4",
  195. "HelloWorld5",
  196. "HelloWorld6",
  197. "HelloWorld7",
  198. "HelloWorld8",
  199. "HelloWorld9",
  200. "HelloWorld10",
  201. };
  202. {
  203. TThreadPool pool(TThreadPool::TParams().SetBlocking(true).SetCatching(false).SetThreadNamePrefix(namePrefix));
  204. TestEnumeratedThreadName(pool, expectedNames);
  205. }
  206. {
  207. TAdaptiveThreadPool pool(TThreadPool::TParams().SetThreadNamePrefix(namePrefix));
  208. TestEnumeratedThreadName(pool, expectedNames);
  209. }
  210. }
  211. }