event_ut.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #include "event.h"
  2. #include "atomic.h"
  3. #include <library/cpp/testing/unittest/registar.h>
  4. #include <util/thread/pool.h>
  5. namespace {
  6. struct TSharedData {
  7. TSharedData()
  8. : Counter(0)
  9. , failed(false)
  10. {
  11. }
  12. TAtomic Counter;
  13. TManualEvent event;
  14. bool failed;
  15. };
  16. struct TThreadTask: public IObjectInQueue {
  17. public:
  18. TThreadTask(TSharedData& data, size_t id)
  19. : Data_(data)
  20. , Id_(id)
  21. {
  22. }
  23. void Process(void*) override {
  24. THolder<TThreadTask> This(this);
  25. if (Id_ == 0) {
  26. usleep(100);
  27. bool cond = Data_.Counter == 0;
  28. if (!cond) {
  29. Data_.failed = true;
  30. }
  31. Data_.event.Signal();
  32. } else {
  33. while (!Data_.event.WaitT(TDuration::Seconds(100))) {
  34. }
  35. AtomicAdd(Data_.Counter, Id_);
  36. }
  37. }
  38. private:
  39. TSharedData& Data_;
  40. size_t Id_;
  41. };
  42. class TSignalTask: public IObjectInQueue {
  43. private:
  44. TManualEvent& Ev_;
  45. public:
  46. TSignalTask(TManualEvent& ev)
  47. : Ev_(ev)
  48. {
  49. }
  50. void Process(void*) override {
  51. Ev_.Signal();
  52. }
  53. };
  54. class TOwnerTask: public IObjectInQueue {
  55. public:
  56. TManualEvent Barrier;
  57. THolder<TManualEvent> Ev;
  58. public:
  59. TOwnerTask()
  60. : Ev(new TManualEvent)
  61. {
  62. }
  63. void Process(void*) override {
  64. Ev->WaitI();
  65. Ev.Destroy();
  66. }
  67. };
  68. }
  69. Y_UNIT_TEST_SUITE(EventTest) {
  70. Y_UNIT_TEST(WaitAndSignalTest) {
  71. TSharedData data;
  72. TThreadPool queue;
  73. queue.Start(5);
  74. for (size_t i = 0; i < 5; ++i) {
  75. UNIT_ASSERT(queue.Add(new TThreadTask(data, i)));
  76. }
  77. queue.Stop();
  78. UNIT_ASSERT(data.Counter == 10);
  79. UNIT_ASSERT(!data.failed);
  80. }
  81. Y_UNIT_TEST(ConcurrentSignalAndWaitTest) {
  82. // test for problem detected by thread-sanitizer (signal/wait race) SEARCH-2113
  83. const size_t limit = 200;
  84. TManualEvent event[limit];
  85. TThreadPool queue;
  86. queue.Start(limit);
  87. TVector<THolder<IObjectInQueue>> tasks;
  88. for (size_t i = 0; i < limit; ++i) {
  89. tasks.emplace_back(MakeHolder<TSignalTask>(event[i]));
  90. UNIT_ASSERT(queue.Add(tasks.back().Get()));
  91. }
  92. for (size_t i = limit; i != 0; --i) {
  93. UNIT_ASSERT(event[i - 1].WaitT(TDuration::Seconds(90)));
  94. }
  95. queue.Stop();
  96. }
  97. /** Test for a problem: http://nga.at.yandex-team.ru/5772 */
  98. Y_UNIT_TEST(DestructorBeforeSignalFinishTest) {
  99. return;
  100. TVector<THolder<IObjectInQueue>> tasks;
  101. for (size_t i = 0; i < 1000; ++i) {
  102. auto owner = MakeHolder<TOwnerTask>();
  103. tasks.emplace_back(MakeHolder<TSignalTask>(*owner->Ev));
  104. tasks.emplace_back(std::move(owner));
  105. }
  106. TThreadPool queue;
  107. queue.Start(4);
  108. for (auto& task : tasks) {
  109. UNIT_ASSERT(queue.Add(task.Get()));
  110. }
  111. queue.Stop();
  112. }
  113. }