poller_tcp_unit_select.cpp 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #include "poller_tcp_unit_select.h"
  2. #include <csignal>
  3. #if defined(_win_)
  4. #include <winsock2.h>
  5. #define SOCKET_ERROR_SOURCE ::WSAGetLastError()
  6. #elif defined(_darwin_)
  7. #include <cerrno>
  8. #define SOCKET_ERROR_SOURCE errno
  9. typedef timeval TIMEVAL;
  10. #else
  11. #include <cerrno>
  12. #define SOCKET_ERROR_SOURCE errno
  13. #endif
  14. namespace NInterconnect {
  15. TPollerUnitSelect::TPollerUnitSelect() {
  16. }
  17. TPollerUnitSelect::~TPollerUnitSelect() {
  18. }
  19. template <bool IsWrite>
  20. void
  21. TPollerUnitSelect::Process() {
  22. auto& side = GetSide<IsWrite>();
  23. side.ProcessInput();
  24. enum : size_t { R,
  25. W,
  26. E };
  27. static const auto O = IsWrite ? W : R;
  28. ::fd_set sets[3];
  29. FD_ZERO(&sets[R]);
  30. FD_ZERO(&sets[W]);
  31. FD_ZERO(&sets[E]);
  32. for (const auto& operation : side.Operations) {
  33. FD_SET(operation.first, &sets[O]);
  34. FD_SET(operation.first, &sets[E]);
  35. }
  36. #if defined(_win_)
  37. ::TIMEVAL timeout = {0L, 99991L};
  38. const auto numberEvents = !side.Operations.empty() ? ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout)
  39. : (::Sleep(100), 0);
  40. #elif defined(_darwin_)
  41. ::TIMEVAL timeout = {0L, 99991L};
  42. const auto numberEvents = ::select(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout);
  43. #else
  44. ::sigset_t sigmask;
  45. ::sigemptyset(&sigmask);
  46. ::sigaddset(&sigmask, SIGPIPE);
  47. ::sigaddset(&sigmask, SIGTERM);
  48. struct ::timespec timeout = {0L, 99999989L};
  49. const auto numberEvents = ::pselect(FD_SETSIZE, &sets[R], &sets[W], &sets[E], &timeout, &sigmask);
  50. #endif
  51. Y_VERIFY_DEBUG(numberEvents >= 0);
  52. for (auto it = side.Operations.cbegin(); side.Operations.cend() != it;) {
  53. if (FD_ISSET(it->first, &sets[O]) || FD_ISSET(it->first, &sets[E]))
  54. if (const auto& finalizer = it->second.second(it->second.first)) {
  55. side.Operations.erase(it++);
  56. finalizer();
  57. continue;
  58. }
  59. ++it;
  60. }
  61. }
  62. void
  63. TPollerUnitSelect::ProcessRead() {
  64. Process<false>();
  65. }
  66. void
  67. TPollerUnitSelect::ProcessWrite() {
  68. Process<true>();
  69. }
  70. }