stream_adaptor_ut.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #include <library/cpp/grpc/server/grpc_request.h>
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include <library/cpp/testing/unittest/tests_data.h>
  4. #include <util/system/thread.h>
  5. #include <util/thread/pool.h>
  6. using namespace NGrpc;
  7. // Here we emulate stream data producer
  8. class TOrderedProducer: public TThread {
  9. public:
  10. TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
  11. : TThread(&ThreadProc, this)
  12. , Adaptor_(adaptor)
  13. , Max_(max)
  14. , WithSleep_(withSleep)
  15. , ConsumerOp_(std::move(consumerOp))
  16. {}
  17. static void* ThreadProc(void* _this) {
  18. SetCurrentThreadName("OrderedProducerThread");
  19. static_cast<TOrderedProducer*>(_this)->Exec();
  20. return nullptr;
  21. }
  22. void Exec() {
  23. for (ui64 i = 0; i < Max_; i++) {
  24. auto cb = [i, this]() mutable {
  25. ConsumerOp_(i);
  26. };
  27. Adaptor_->Enqueue(std::move(cb), false);
  28. if (WithSleep_ && (i % 256 == 0)) {
  29. Sleep(TDuration::MilliSeconds(10));
  30. }
  31. }
  32. }
  33. private:
  34. IStreamAdaptor* Adaptor_;
  35. const ui64 Max_;
  36. const bool WithSleep_;
  37. std::function<void(ui64)> ConsumerOp_;
  38. };
  39. Y_UNIT_TEST_SUITE(StreamAdaptor) {
  40. static void OrderingTest(size_t threads, bool withSleep) {
  41. auto adaptor = CreateStreamAdaptor();
  42. const i64 max = 10000;
  43. // Here we will emulate grpc stream (NextReply call after writing)
  44. std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false)));
  45. // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
  46. consumerQueue->Start(threads, 1);
  47. // Non atomic!!! Stream adaptor must protect us
  48. ui64 curVal = 0;
  49. // Used just to wait in the main thread
  50. TAtomic finished = false;
  51. auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
  52. // Check no reordering inside stream adaptor
  53. // and no simultanious consumer Op call
  54. UNIT_ASSERT_VALUES_EQUAL(curVal, i);
  55. curVal++;
  56. // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
  57. // so compare here and set after
  58. bool tmp = curVal == max;
  59. bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
  60. // Additional check the value still same
  61. // run under tsan makes sure no consumer Op call before we call ProcessNext
  62. UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
  63. ptr->ProcessNext();
  64. // Reordering after ProcessNext is possible, so check tmp and set finished to true
  65. if (tmp)
  66. AtomicSet(finished, true);
  67. });
  68. UNIT_ASSERT(res);
  69. };
  70. TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
  71. producer.Start();
  72. producer.Join();
  73. while (!AtomicGet(finished))
  74. {
  75. Sleep(TDuration::MilliSeconds(100));
  76. }
  77. consumerQueue->Stop();
  78. UNIT_ASSERT_VALUES_EQUAL(curVal, max);
  79. }
  80. Y_UNIT_TEST(OrderingOneThread) {
  81. OrderingTest(1, false);
  82. }
  83. Y_UNIT_TEST(OrderingTwoThreads) {
  84. OrderingTest(2, false);
  85. }
  86. Y_UNIT_TEST(OrderingManyThreads) {
  87. OrderingTest(10, false);
  88. }
  89. Y_UNIT_TEST(OrderingOneThreadWithSleep) {
  90. OrderingTest(1, true);
  91. }
  92. Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
  93. OrderingTest(2, true);
  94. }
  95. Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
  96. OrderingTest(10, true);
  97. }
  98. }