#include #include #include "pair.h" #include "poller.h" #include "pollerimpl.h" Y_UNIT_TEST_SUITE(TSocketPollerTest) { Y_UNIT_TEST(TestSimple) { SOCKET sockets[2]; UNIT_ASSERT(SocketPair(sockets) == 0); TSocketHolder s1(sockets[0]); TSocketHolder s2(sockets[1]); TSocketPoller poller; poller.WaitRead(sockets[1], (void*)17); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); for (ui32 i = 0; i < 3; ++i) { char buf[] = {18}; UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL(18, buf[0]); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); } } Y_UNIT_TEST(TestSimpleOneShot) { SOCKET sockets[2]; UNIT_ASSERT(SocketPair(sockets) == 0); TSocketHolder s1(sockets[0]); TSocketHolder s2(sockets[1]); TSocketPoller poller; UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); for (ui32 i = 0; i < 3; ++i) { poller.WaitReadOneShot(sockets[1], (void*)17); char buf[1]; buf[0] = i + 20; UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); buf[0] = i + 21; UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); // this fails if socket is not oneshot UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]); } } Y_UNIT_TEST(TestItIsSafeToUnregisterUnregisteredDescriptor) { SOCKET sockets[2]; UNIT_ASSERT(SocketPair(sockets) == 0); TSocketHolder s1(sockets[0]); TSocketHolder s2(sockets[1]); TSocketPoller poller; poller.Unwait(s1); } Y_UNIT_TEST(TestItIsSafeToReregisterDescriptor) { SOCKET sockets[2]; UNIT_ASSERT(SocketPair(sockets) == 0); TSocketHolder s1(sockets[0]); TSocketHolder s2(sockets[1]); TSocketPoller poller; poller.WaitRead(s1, nullptr); poller.WaitRead(s1, nullptr); poller.WaitWrite(s1, nullptr); } Y_UNIT_TEST(TestSimpleEdgeTriggered) { SOCKET sockets[2]; UNIT_ASSERT(SocketPair(sockets) == 0); TSocketHolder s1(sockets[0]); TSocketHolder s2(sockets[1]); SetNonBlock(sockets[1]); TSocketPoller poller; UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); for (ui32 i = 0; i < 3; ++i) { poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17); // notify about writeble UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); char buf[2]; buf[0] = i + 10; buf[1] = i + 20; // send one byte UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); // restart without reading poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false); // after restart read and write might generate separate events { void* events[3]; size_t count = poller.WaitT(events, 3, TDuration::Zero()); UNIT_ASSERT_GE(count, 1); UNIT_ASSERT_LE(count, 2); UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17); } UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); // second two more bytes UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0)); // here poller could notify or not because we haven't seen end Y_UNUSED(poller.WaitT(TDuration::Zero())); // recv one, leave two UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); // nothing new UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); // recv the rest UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0)); UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]); // still nothing new UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); // hit end ClearLastSystemError(); UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError()); // restart after end (noop for epoll) poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); // send and recv byte UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); // recv and see end UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); // the same but send before restart UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0)); // restart after end (noop for epoll) poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true); UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0)); UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero())); poller.Unwait(sockets[1]); } } #if defined(HAVE_EPOLL_POLLER) Y_UNIT_TEST(TestRdhup) { SOCKET sockets[2]; UNIT_ASSERT(SocketPair(sockets) == 0); TSocketHolder s1(sockets[0]); TSocketHolder s2(sockets[1]); char buf[1] = {0}; UNIT_ASSERT_VALUES_EQUAL(1, send(s1, buf, 1, 0)); shutdown(s1, SHUT_WR); using TPoller = TGenericPoller>; TPoller poller; poller.Set((void*)17, s2, CONT_POLL_RDHUP); TPoller::TEvent e; UNIT_ASSERT_VALUES_EQUAL(poller.WaitD(&e, 1, TDuration::Zero().ToDeadLine()), 1); UNIT_ASSERT_EQUAL(TPoller::ExtractStatus(&e), 0); UNIT_ASSERT_EQUAL(TPoller::ExtractFilter(&e), CONT_POLL_RDHUP); UNIT_ASSERT_EQUAL(TPoller::ExtractEvent(&e), (void*)17); } Y_UNIT_TEST(TestSetSocketErrors) { TGenericPoller> poller; UNIT_ASSERT_EXCEPTION_CONTAINS(poller.Set(nullptr, Max(), CONT_POLL_READ), TSystemError, "epoll add failed"); UNIT_ASSERT_EXCEPTION_CONTAINS(poller.Set(nullptr, Max(), CONT_POLL_READ | CONT_POLL_MODIFY), TSystemError, "epoll modify failed"); } #endif }