service.cpp 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. #include "service.h"
  2. #include <library/cpp/coroutine/engine/sockpool.h>
  3. #include <library/cpp/http/io/stream.h>
  4. #include <library/cpp/http/fetch/httpheader.h>
  5. #include <library/cpp/http/fetch/httpfsm.h>
  6. #include <library/cpp/uri/http_url.h>
  7. #include <util/generic/buffer.h>
  8. #include <util/stream/str.h>
  9. #include <util/stream/buffer.h>
  10. #include <util/stream/zerocopy.h>
  11. #include <util/string/vector.h>
  12. namespace NMonitoring {
  13. class THttpClient: public IHttpRequest {
  14. public:
  15. void ServeRequest(THttpInput& in, IOutputStream& out, const NAddr::IRemoteAddr* remoteAddr, const THandler& Handler) {
  16. try {
  17. try {
  18. RemoteAddr = remoteAddr;
  19. THttpHeaderParser parser;
  20. parser.Init(&Header);
  21. if (parser.Execute(in.FirstLine().data(), in.FirstLine().size()) < 0) {
  22. out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
  23. return;
  24. }
  25. if (Url.Parse(Header.GetUrl().data()) != THttpURL::ParsedOK) {
  26. out << "HTTP/1.1 400 Invalid url\r\nConnection: Close\r\n\r\n";
  27. return;
  28. }
  29. TString path = GetPath();
  30. if (!path.StartsWith('/')) {
  31. out << "HTTP/1.1 400 Bad request\r\nConnection: Close\r\n\r\n";
  32. return;
  33. }
  34. Headers = &in.Headers();
  35. CgiParams.Scan(Url.Get(THttpURL::FieldQuery));
  36. } catch (...) {
  37. out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n";
  38. YSYSLOG(TLOG_ERR, "THttpClient: internal error while serving monitoring request: %s", CurrentExceptionMessage().data());
  39. }
  40. if (Header.http_method == HTTP_METHOD_POST)
  41. TransferData(&in, &PostContent);
  42. Handler(out, *this);
  43. out.Finish();
  44. } catch (...) {
  45. auto msg = CurrentExceptionMessage();
  46. out << "HTTP/1.1 500 Internal server error\r\nConnection: Close\r\n\r\n" << msg;
  47. out.Finish();
  48. YSYSLOG(TLOG_ERR, "THttpClient: error while serving monitoring request: %s", msg.data());
  49. }
  50. }
  51. const char* GetURI() const override {
  52. return Header.request_uri.c_str();
  53. }
  54. const char* GetPath() const override {
  55. return Url.Get(THttpURL::FieldPath);
  56. }
  57. const TCgiParameters& GetParams() const override {
  58. return CgiParams;
  59. }
  60. const TCgiParameters& GetPostParams() const override {
  61. if (PostParams.empty() && !PostContent.Buffer().Empty())
  62. const_cast<THttpClient*>(this)->ScanPostParams();
  63. return PostParams;
  64. }
  65. TStringBuf GetPostContent() const override {
  66. return TStringBuf(PostContent.Buffer().Data(), PostContent.Buffer().Size());
  67. }
  68. HTTP_METHOD GetMethod() const override {
  69. return (HTTP_METHOD)Header.http_method;
  70. }
  71. void ScanPostParams() {
  72. PostParams.Scan(TStringBuf(PostContent.Buffer().data(), PostContent.Buffer().size()));
  73. }
  74. const THttpHeaders& GetHeaders() const override {
  75. if (Headers != nullptr) {
  76. return *Headers;
  77. }
  78. static THttpHeaders defaultHeaders;
  79. return defaultHeaders;
  80. }
  81. TString GetRemoteAddr() const override {
  82. return RemoteAddr ? NAddr::PrintHostAndPort(*RemoteAddr) : TString();
  83. }
  84. private:
  85. THttpRequestHeader Header;
  86. const THttpHeaders* Headers = nullptr;
  87. THttpURL Url;
  88. TCgiParameters CgiParams;
  89. TCgiParameters PostParams;
  90. TBufferOutput PostContent;
  91. const NAddr::IRemoteAddr* RemoteAddr = nullptr;
  92. };
  93. /* TCoHttpServer */
  94. class TCoHttpServer::TConnection: public THttpClient {
  95. public:
  96. TConnection(const TCoHttpServer::TAcceptFull& acc, const TCoHttpServer& parent)
  97. : Socket(acc.S->Release())
  98. , RemoteAddr(acc.Remote)
  99. , Parent(parent)
  100. {
  101. }
  102. void operator()(TCont* c) {
  103. try {
  104. THolder<TConnection> me(this);
  105. TContIO io(Socket, c);
  106. THttpInput in(&io);
  107. THttpOutput out(&io, &in);
  108. // buffer reply so there will be ne context switching
  109. TStringStream s;
  110. ServeRequest(in, s, RemoteAddr, Parent.Handler);
  111. out << s.Str();
  112. out.Finish();
  113. } catch (...) {
  114. YSYSLOG(TLOG_WARNING, "TCoHttpServer::TConnection: error: %s\n", CurrentExceptionMessage().data());
  115. }
  116. }
  117. private:
  118. TSocketHolder Socket;
  119. const NAddr::IRemoteAddr* RemoteAddr;
  120. const TCoHttpServer& Parent;
  121. };
  122. TCoHttpServer::TCoHttpServer(TContExecutor& executor, const TString& bindAddr, TIpPort port, THandler handler)
  123. : Executor(executor)
  124. , Listener(this, &executor)
  125. , Handler(std::move(handler))
  126. , BindAddr(bindAddr)
  127. , Port(port)
  128. {
  129. try {
  130. Listener.Bind(TIpAddress(bindAddr, port));
  131. } catch (yexception e) {
  132. Y_ABORT("TCoHttpServer::TCoHttpServer: couldn't bind to %s:%d\n", bindAddr.data(), port);
  133. }
  134. }
  135. void TCoHttpServer::Start() {
  136. Listener.Listen();
  137. }
  138. void TCoHttpServer::Stop() {
  139. Listener.Stop();
  140. }
  141. void TCoHttpServer::OnAcceptFull(const TAcceptFull& acc) {
  142. THolder<TConnection> conn(new TConnection(acc, *this));
  143. Executor.Create(*conn, "client");
  144. Y_UNUSED(conn.Release());
  145. }
  146. void TCoHttpServer::OnError() {
  147. throw; // just rethrow
  148. }
  149. void TCoHttpServer::ProcessRequest(IOutputStream& out, const IHttpRequest& request) {
  150. try {
  151. TNetworkAddress addr(BindAddr, Port);
  152. TSocket sock(addr);
  153. TSocketOutput sock_out(sock);
  154. TSocketInput sock_in(sock);
  155. sock_out << "GET " << request.GetURI() << " HTTP/1.0\r\n\r\n";
  156. THttpInput http_in(&sock_in);
  157. try {
  158. out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
  159. TransferData(&http_in, &out);
  160. } catch (...) {
  161. YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
  162. }
  163. } catch (const yexception& /*e*/) {
  164. out << "HTTP/1.1 500 Internal server error\nConnection: Close\n\n";
  165. YSYSLOG(TLOG_DEBUG, "TCoHttpServer: while getting data from backend: %s", CurrentExceptionMessage().data());
  166. }
  167. }
  168. /* TMtHttpServer */
  169. class TMtHttpServer::TConnection: public TClientRequest, public THttpClient {
  170. public:
  171. TConnection(const TMtHttpServer& parent)
  172. : Parent(parent)
  173. {
  174. }
  175. bool Reply(void*) override {
  176. ServeRequest(Input(), Output(), NAddr::GetPeerAddr(Socket()).Get(), Parent.Handler);
  177. return true;
  178. }
  179. private:
  180. const TMtHttpServer& Parent;
  181. };
  182. TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, IThreadFactory* pool)
  183. : THttpServer(this, options, pool)
  184. , Handler(std::move(handler))
  185. {
  186. }
  187. TMtHttpServer::TMtHttpServer(const TOptions& options, THandler handler, TSimpleSharedPtr<IThreadPool> pool)
  188. : THttpServer(this, /* mainWorkers = */pool, /* failWorkers = */pool, options)
  189. , Handler(std::move(handler))
  190. {
  191. }
  192. bool TMtHttpServer::Start() {
  193. return THttpServer::Start();
  194. }
  195. void TMtHttpServer::StartOrThrow() {
  196. if (!Start()) {
  197. const auto& opts = THttpServer::Options();
  198. TNetworkAddress addr = opts.Host
  199. ? TNetworkAddress(opts.Host, opts.Port)
  200. : TNetworkAddress(opts.Port);
  201. ythrow TSystemError(GetErrorCode()) << addr;
  202. }
  203. }
  204. void TMtHttpServer::Stop() {
  205. THttpServer::Stop();
  206. }
  207. TClientRequest* TMtHttpServer::CreateClient() {
  208. return new TConnection(*this);
  209. }
  210. /* TService */
  211. TMonService::TMonService(TContExecutor& executor, TIpPort internalPort, TIpPort externalPort,
  212. THandler coHandler, THandler mtHandler)
  213. : CoServer(executor, "127.0.0.1", internalPort, std::move(coHandler))
  214. , MtServer(THttpServerOptions(externalPort), std::bind(&TMonService::DispatchRequest, this, std::placeholders::_1, std::placeholders::_2))
  215. , MtHandler(std::move(mtHandler))
  216. {
  217. }
  218. void TMonService::Start() {
  219. MtServer.Start();
  220. CoServer.Start();
  221. }
  222. void TMonService::Stop() {
  223. MtServer.Stop();
  224. CoServer.Stop();
  225. }
  226. void TMonService::DispatchRequest(IOutputStream& out, const IHttpRequest& request) {
  227. if (strcmp(request.GetPath(), "/") == 0) {
  228. out << "HTTP/1.1 200 Ok\nConnection: Close\n\n";
  229. MtHandler(out, request);
  230. } else
  231. CoServer.ProcessRequest(out, request);
  232. }
  233. }