123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/system/error.h>
- #include "pair.h"
- #include "poller.h"
- #include "pollerimpl.h"
- Y_UNIT_TEST_SUITE(TSocketPollerTest) {
- Y_UNIT_TEST(TestSimple) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
- TSocketPoller poller;
- poller.WaitRead(sockets[1], (void*)17);
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- for (ui32 i = 0; i < 3; ++i) {
- char buf[] = {18};
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(18, buf[0]);
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- }
- }
- Y_UNIT_TEST(TestSimpleOneShot) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
- TSocketPoller poller;
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- for (ui32 i = 0; i < 3; ++i) {
- poller.WaitReadOneShot(sockets[1], (void*)17);
- char buf[1];
- buf[0] = i + 20;
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[0]);
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- buf[0] = i + 21;
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- // this fails if socket is not oneshot
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 21), buf[0]);
- }
- }
- Y_UNIT_TEST(TestItIsSafeToUnregisterUnregisteredDescriptor) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
- TSocketPoller poller;
- poller.Unwait(s1);
- }
- Y_UNIT_TEST(TestItIsSafeToReregisterDescriptor) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
- TSocketPoller poller;
- poller.WaitRead(s1, nullptr);
- poller.WaitRead(s1, nullptr);
- poller.WaitWrite(s1, nullptr);
- }
- Y_UNIT_TEST(TestSimpleEdgeTriggered) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
- SetNonBlock(sockets[1]);
- TSocketPoller poller;
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- for (ui32 i = 0; i < 3; ++i) {
- poller.WaitReadWriteEdgeTriggered(sockets[1], (void*)17);
- // notify about writeble
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- char buf[2];
- buf[0] = i + 10;
- buf[1] = i + 20;
- // send one byte
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- // restart without reading
- poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, false);
- // after restart read and write might generate separate events
- {
- void* events[3];
- size_t count = poller.WaitT(events, 3, TDuration::Zero());
- UNIT_ASSERT_GE(count, 1);
- UNIT_ASSERT_LE(count, 2);
- UNIT_ASSERT_VALUES_EQUAL(events[0], (void*)17);
- }
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- // second two more bytes
- UNIT_ASSERT_VALUES_EQUAL(2, send(sockets[0], buf, 2, 0));
- // here poller could notify or not because we haven't seen end
- Y_UNUSED(poller.WaitT(TDuration::Zero()));
- // recv one, leave two
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
- // nothing new
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- // recv the rest
- UNIT_ASSERT_VALUES_EQUAL(2, recv(sockets[1], buf, 2, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
- UNIT_ASSERT_VALUES_EQUAL(char(i + 20), buf[1]);
- // still nothing new
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- // hit end
- ClearLastSystemError();
- UNIT_ASSERT_VALUES_EQUAL(-1, recv(sockets[1], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL(EAGAIN, LastSystemError());
- // restart after end (noop for epoll)
- poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);
- // send and recv byte
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- // recv and see end
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- // the same but send before restart
- UNIT_ASSERT_VALUES_EQUAL(1, send(sockets[0], buf, 1, 0));
- // restart after end (noop for epoll)
- poller.RestartReadWriteEdgeTriggered(sockets[1], (void*)17, true);
- UNIT_ASSERT_VALUES_EQUAL((void*)17, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(1, recv(sockets[1], buf, 2, 0));
- UNIT_ASSERT_VALUES_EQUAL(char(i + 10), buf[0]);
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- UNIT_ASSERT_VALUES_EQUAL(nullptr, poller.WaitT(TDuration::Zero()));
- poller.Unwait(sockets[1]);
- }
- }
- #if defined(HAVE_EPOLL_POLLER)
- Y_UNIT_TEST(TestRdhup) {
- SOCKET sockets[2];
- UNIT_ASSERT(SocketPair(sockets) == 0);
- TSocketHolder s1(sockets[0]);
- TSocketHolder s2(sockets[1]);
- char buf[1] = {0};
- UNIT_ASSERT_VALUES_EQUAL(1, send(s1, buf, 1, 0));
- shutdown(s1, SHUT_WR);
- using TPoller = TGenericPoller<TEpollPoller<TWithoutLocking>>;
- TPoller poller;
- poller.Set((void*)17, s2, CONT_POLL_RDHUP);
- TPoller::TEvent e;
- UNIT_ASSERT_VALUES_EQUAL(poller.WaitD(&e, 1, TDuration::Zero().ToDeadLine()), 1);
- UNIT_ASSERT_EQUAL(TPoller::ExtractStatus(&e), 0);
- UNIT_ASSERT_EQUAL(TPoller::ExtractFilter(&e), CONT_POLL_RDHUP);
- UNIT_ASSERT_EQUAL(TPoller::ExtractEvent(&e), (void*)17);
- }
- Y_UNIT_TEST(TestSetSocketErrors) {
- TGenericPoller<TEpollPoller<TWithoutLocking>> poller;
- UNIT_ASSERT_EXCEPTION_CONTAINS(poller.Set(nullptr, Max<int>(), CONT_POLL_READ), TSystemError, "epoll add failed");
- UNIT_ASSERT_EXCEPTION_CONTAINS(poller.Set(nullptr, Max<int>(), CONT_POLL_READ | CONT_POLL_MODIFY), TSystemError, "epoll modify failed");
- }
- #endif
- }
|