simple_server.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #include "simple_server.h"
  2. #include <util/network/pair.h>
  3. #include <util/network/poller.h>
  4. #include <util/network/sock.h>
  5. #include <util/string/builder.h>
  6. #include <util/system/thread.h>
  7. #include <util/thread/pool.h>
  8. TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler)
  9. : Port_(port)
  10. {
  11. auto listenSocket = MakeAtomicShared<TInetStreamSocket>();
  12. TSockAddrInet addr((TIpHost)INADDR_ANY, Port_);
  13. SetSockOpt(*listenSocket, SOL_SOCKET, SO_REUSEADDR, 1);
  14. int ret = listenSocket->Bind(&addr);
  15. Y_ENSURE_EX(ret == 0, TSystemError() << "Can not bind");
  16. SOCKET socketPair[2];
  17. ret = SocketPair(socketPair);
  18. Y_ENSURE_EX(ret == 0, TSystemError() << "Can not create socket pair");
  19. ret = listenSocket->Listen(10);
  20. Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket");
  21. SendFinishSocket_ = std::make_unique<TInetStreamSocket>(socketPair[1]);
  22. ThreadPool_ = std::make_unique<TAdaptiveThreadPool>();
  23. ThreadPool_->Start(1);
  24. auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]);
  25. ListenerThread_ = ThreadPool_->Run([listenSocket, receiveFinish, requestHandler] {
  26. TSocketPoller socketPoller;
  27. socketPoller.WaitRead(*receiveFinish, nullptr);
  28. socketPoller.WaitRead(*listenSocket, (void*)1);
  29. bool running = true;
  30. while (running) {
  31. void* cookies[2];
  32. size_t cookieCount = socketPoller.WaitI(cookies, 2);
  33. for (size_t i = 0; i != cookieCount; ++i) {
  34. if (!cookies[i]) {
  35. running = false;
  36. } else {
  37. TSockAddrInet addr;
  38. TAtomicSharedPtr<TStreamSocket> socket = MakeAtomicShared<TInetStreamSocket>();
  39. int ret = listenSocket->Accept(socket.Get(), &addr);
  40. Y_ENSURE_EX(ret == 0, TSystemError() << "Can not accept connection");
  41. SystemThreadFactory()->Run(
  42. [socket, requestHandler] {
  43. TStreamSocketInput input(socket.Get());
  44. TStreamSocketOutput output(socket.Get());
  45. requestHandler(&input, &output);
  46. socket->Close();
  47. });
  48. }
  49. }
  50. }
  51. });
  52. }
  53. TSimpleServer::~TSimpleServer()
  54. {
  55. try {
  56. if (ThreadPool_) {
  57. Stop();
  58. }
  59. } catch (...) {
  60. }
  61. }
  62. void TSimpleServer::Stop()
  63. {
  64. // Just send something to indicate shutdown.
  65. SendFinishSocket_->Send("X", 1);
  66. ListenerThread_->Join();
  67. ThreadPool_->Stop();
  68. ThreadPool_.reset();
  69. }
  70. int TSimpleServer::GetPort() const
  71. {
  72. return Port_;
  73. }
  74. TString TSimpleServer::GetAddress() const
  75. {
  76. return TStringBuilder() << "localhost:" << Port_;
  77. }