poller_tcp_unit.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. #include "poller_tcp_unit.h"
  2. #if !defined(_win_) && !defined(_darwin_)
  3. #include "poller_tcp_unit_epoll.h"
  4. #endif
  5. #include "poller_tcp_unit_select.h"
  6. #include "poller.h"
  7. #include <library/cpp/actors/prof/tag.h>
  8. #include <library/cpp/actors/util/intrinsics.h>
  9. #if defined _linux_
  10. #include <pthread.h>
  11. #endif
  12. namespace NInterconnect {
  13. TPollerUnit::TPtr
  14. TPollerUnit::Make(bool useSelect) {
  15. #if defined(_win_) || defined(_darwin_)
  16. Y_UNUSED(useSelect);
  17. return TPtr(new TPollerUnitSelect);
  18. #else
  19. return useSelect ? TPtr(new TPollerUnitSelect) : TPtr(new TPollerUnitEpoll);
  20. #endif
  21. }
  22. TPollerUnit::TPollerUnit()
  23. : StopFlag(true)
  24. , ReadLoop(TThread::TParams(IdleThread<false>, this).SetName("network read"))
  25. , WriteLoop(TThread::TParams(IdleThread<true>, this).SetName("network write"))
  26. {
  27. }
  28. TPollerUnit::~TPollerUnit() {
  29. if (!AtomicLoad(&StopFlag))
  30. Stop();
  31. }
  32. void
  33. TPollerUnit::Start() {
  34. AtomicStore(&StopFlag, false);
  35. ReadLoop.Start();
  36. WriteLoop.Start();
  37. }
  38. void
  39. TPollerUnit::Stop() {
  40. AtomicStore(&StopFlag, true);
  41. ReadLoop.Join();
  42. WriteLoop.Join();
  43. }
  44. template <>
  45. TPollerUnit::TSide&
  46. TPollerUnit::GetSide<false>() {
  47. return Read;
  48. }
  49. template <>
  50. TPollerUnit::TSide&
  51. TPollerUnit::GetSide<true>() {
  52. return Write;
  53. }
  54. void
  55. TPollerUnit::StartReadOperation(
  56. const TIntrusivePtr<TSharedDescriptor>& stream,
  57. TFDDelegate&& operation) {
  58. Y_VERIFY_DEBUG(stream);
  59. if (AtomicLoad(&StopFlag))
  60. return;
  61. GetSide<false>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
  62. }
  63. void
  64. TPollerUnit::StartWriteOperation(
  65. const TIntrusivePtr<TSharedDescriptor>& stream,
  66. TFDDelegate&& operation) {
  67. Y_VERIFY_DEBUG(stream);
  68. if (AtomicLoad(&StopFlag))
  69. return;
  70. GetSide<true>().InputQueue.Push(TSide::TItem(stream, std::move(operation)));
  71. }
  72. template <bool IsWrite>
  73. void*
  74. TPollerUnit::IdleThread(void* param) {
  75. // TODO: musl-libc version of `sched_param` struct is for some reason different from pthread
  76. // version in Ubuntu 12.04
  77. #if defined(_linux_) && !defined(_musl_)
  78. pthread_t threadSelf = pthread_self();
  79. sched_param sparam = {20};
  80. pthread_setschedparam(threadSelf, SCHED_FIFO, &sparam);
  81. #endif
  82. static_cast<TPollerUnit*>(param)->RunLoop<IsWrite>();
  83. return nullptr;
  84. }
  85. template <>
  86. void
  87. TPollerUnit::RunLoop<false>() {
  88. NProfiling::TMemoryTagScope tag("INTERCONNECT_RECEIVED_DATA");
  89. while (!AtomicLoad(&StopFlag))
  90. ProcessRead();
  91. }
  92. template <>
  93. void
  94. TPollerUnit::RunLoop<true>() {
  95. NProfiling::TMemoryTagScope tag("INTERCONNECT_SEND_DATA");
  96. while (!AtomicLoad(&StopFlag))
  97. ProcessWrite();
  98. }
  99. void
  100. TPollerUnit::TSide::ProcessInput() {
  101. if (!InputQueue.IsEmpty())
  102. do {
  103. auto sock = InputQueue.Top().first->GetDescriptor();
  104. if (!Operations.emplace(sock, std::move(InputQueue.Top())).second)
  105. Y_FAIL("Descriptor is already in pooler.");
  106. } while (InputQueue.Pop());
  107. }
  108. }