#include "www.h" #include "concat_strings.h" #include "html_output.h" #include #include #include #include #include #include #include #include #include #include #include #include using namespace NBus; using namespace NBus::NPrivate; using namespace NActor; using namespace NActor::NPrivate; static const char HTTP_OK_JS[] = "HTTP/1.1 200 Ok\r\nContent-Type: text/javascript\r\nConnection: Close\r\n\r\n"; static const char HTTP_OK_JSON[] = "HTTP/1.1 200 Ok\r\nContent-Type: application/json; charset=utf-8\r\nConnection: Close\r\n\r\n"; static const char HTTP_OK_PNG[] = "HTTP/1.1 200 Ok\r\nContent-Type: image/png\r\nConnection: Close\r\n\r\n"; static const char HTTP_OK_BIN[] = "HTTP/1.1 200 Ok\r\nContent-Type: application/octet-stream\r\nConnection: Close\r\n\r\n"; static const char HTTP_OK_HTML[] = "HTTP/1.1 200 Ok\r\nContent-Type: text/html; charset=utf-8\r\nConnection: Close\r\n\r\n"; namespace { typedef TIntrusivePtr TBusModuleInternalPtr; template struct TNamedValues { TVector> Entries; TValuePtr FindByName(TStringBuf name) { Y_ABORT_UNLESS(!!name); for (unsigned i = 0; i < Entries.size(); ++i) { if (Entries[i].first == name) { return Entries[i].second; } } return TValuePtr(); } TString FindNameByPtr(TValuePtr value) { Y_ABORT_UNLESS(!!value); for (unsigned i = 0; i < Entries.size(); ++i) { if (Entries[i].second.Get() == value.Get()) { return Entries[i].first; } } Y_ABORT("unregistered"); } void Add(TValuePtr p) { Y_ABORT_UNLESS(!!p); // Do not add twice for (unsigned i = 0; i < Entries.size(); ++i) { if (Entries[i].second.Get() == p.Get()) { return; } } if (!!p->GetNameInternal()) { TValuePtr current = FindByName(p->GetNameInternal()); if (!current) { Entries.emplace_back(p->GetNameInternal(), p); return; } } for (unsigned i = 1;; ++i) { TString prefix = p->GetNameInternal(); if (!prefix) { prefix = "unnamed"; } TString name = ConcatStrings(prefix, "-", i); TValuePtr current = FindByName(name); if (!current) { Entries.emplace_back(name, p); return; } } } size_t size() const { return Entries.size(); } bool operator!() const { return size() == 0; } }; template struct TSessionValues: public TNamedValues { typedef TNamedValues TBase; TVector GetNamesForQueue(TBusMessageQueue* queue) { TVector r; for (unsigned i = 0; i < TBase::size(); ++i) { if (TBase::Entries[i].second->GetQueue() == queue) { r.push_back(TBase::Entries[i].first); } } return r; } }; } namespace { TString RootHref() { return ConcatStrings("?"); } TString QueueHref(TStringBuf name) { return ConcatStrings("?q=", name); } TString ServerSessionHref(TStringBuf name) { return ConcatStrings("?ss=", name); } TString ClientSessionHref(TStringBuf name) { return ConcatStrings("?cs=", name); } TString OldModuleHref(TStringBuf name) { return ConcatStrings("?om=", name); } /* static void RootLink() { A(RootHref(), "root"); } */ void QueueLink(TStringBuf name) { A(QueueHref(name), name); } void ServerSessionLink(TStringBuf name) { A(ServerSessionHref(name), name); } void ClientSessionLink(TStringBuf name) { A(ClientSessionHref(name), name); } void OldModuleLink(TStringBuf name) { A(OldModuleHref(name), name); } } struct TBusWww::TImpl { // TODO: use weak pointers TNamedValues Queues; TSessionValues> ClientSessions; TSessionValues> ServerSessions; TSessionValues Modules; TMutex Mutex; void RegisterClientSession(TBusClientSessionPtr s) { Y_ABORT_UNLESS(!!s); TGuard g(Mutex); ClientSessions.Add(s.Get()); Queues.Add(s->GetQueue()); } void RegisterServerSession(TBusServerSessionPtr s) { Y_ABORT_UNLESS(!!s); TGuard g(Mutex); ServerSessions.Add(s.Get()); Queues.Add(s->GetQueue()); } void RegisterQueue(TBusMessageQueuePtr q) { Y_ABORT_UNLESS(!!q); TGuard g(Mutex); Queues.Add(q); } void RegisterModule(TBusModule* module) { Y_ABORT_UNLESS(!!module); TGuard g(Mutex); { TVector clientSessions = module->GetInternal()->GetClientSessionsInternal(); for (unsigned i = 0; i < clientSessions.size(); ++i) { RegisterClientSession(clientSessions[i]); } } { TVector serverSessions = module->GetInternal()->GetServerSessionsInternal(); for (unsigned i = 0; i < serverSessions.size(); ++i) { RegisterServerSession(serverSessions[i]); } } Queues.Add(module->GetInternal()->GetQueue()); Modules.Add(module->GetInternal()); } TString FindQueueNameBySessionName(TStringBuf sessionName, bool client) { TIntrusivePtr clientSession; TIntrusivePtr serverSession; TBusSession* session; if (client) { clientSession = ClientSessions.FindByName(sessionName); session = clientSession.Get(); } else { serverSession = ServerSessions.FindByName(sessionName); session = serverSession.Get(); } Y_ABORT_UNLESS(!!session); return Queues.FindNameByPtr(session->GetQueue()); } struct TRequest { TImpl* const Outer; IOutputStream& Os; const TCgiParameters& CgiParams; const TOptionalParams& Params; TRequest(TImpl* outer, IOutputStream& os, const TCgiParameters& cgiParams, const TOptionalParams& params) : Outer(outer) , Os(os) , CgiParams(cgiParams) , Params(params) { } void CrumbsParentLinks() { for (unsigned i = 0; i < Params.ParentLinks.size(); ++i) { const TLink& link = Params.ParentLinks[i]; TTagGuard li("li"); A(link.Href, link.Title); } } void Crumb(TStringBuf name, TStringBuf href = "") { if (!!href) { TTagGuard li("li"); A(href, name); } else { LiWithClass("active", name); } } void BreadcrumbRoot() { TTagGuard ol("ol", "breadcrumb"); CrumbsParentLinks(); Crumb("MessageBus"); } void BreadcrumbQueue(TStringBuf queueName) { TTagGuard ol("ol", "breadcrumb"); CrumbsParentLinks(); Crumb("MessageBus", RootHref()); Crumb(ConcatStrings("queue ", queueName)); } void BreadcrumbSession(TStringBuf sessionName, bool client) { TString queueName = Outer->FindQueueNameBySessionName(sessionName, client); TStringBuf whatSession = client ? "client session" : "server session"; TTagGuard ol("ol", "breadcrumb"); CrumbsParentLinks(); Crumb("MessageBus", RootHref()); Crumb(ConcatStrings("queue ", queueName), QueueHref(queueName)); Crumb(ConcatStrings(whatSession, " ", sessionName)); } void ServeSessionsOfQueue(TBusMessageQueuePtr queue, bool includeQueue) { TVector clientNames = Outer->ClientSessions.GetNamesForQueue(queue.Get()); TVector serverNames = Outer->ServerSessions.GetNamesForQueue(queue.Get()); TVector moduleNames = Outer->Modules.GetNamesForQueue(queue.Get()); TTagGuard table("table", "table table-condensed table-bordered"); { TTagGuard colgroup("colgroup"); TagWithClass("col", "col-md-2"); TagWithClass("col", "col-md-2"); TagWithClass("col", "col-md-8"); } { TTagGuard tr("tr"); Th("What", "span2"); Th("Name", "span2"); Th("Status", "span6"); } if (includeQueue) { TTagGuard tr1("tr"); Td("queue"); { TTagGuard td("td"); QueueLink(Outer->Queues.FindNameByPtr(queue)); } { TTagGuard tr2("td"); Pre(queue->GetStatusSingleLine()); } } for (unsigned j = 0; j < clientNames.size(); ++j) { TTagGuard tr("tr"); Td("client session"); { TTagGuard td("td"); ClientSessionLink(clientNames[j]); } { TTagGuard td("td"); Pre(Outer->ClientSessions.FindByName(clientNames[j])->GetStatusSingleLine()); } } for (unsigned j = 0; j < serverNames.size(); ++j) { TTagGuard tr("tr"); Td("server session"); { TTagGuard td("td"); ServerSessionLink(serverNames[j]); } { TTagGuard td("td"); Pre(Outer->ServerSessions.FindByName(serverNames[j])->GetStatusSingleLine()); } } for (unsigned j = 0; j < moduleNames.size(); ++j) { TTagGuard tr("tr"); Td("module"); { TTagGuard td("td"); if (false) { OldModuleLink(moduleNames[j]); } else { // TODO Text(moduleNames[j]); } } { TTagGuard td("td"); Pre(Outer->Modules.FindByName(moduleNames[j])->GetStatusSingleLine()); } } } void ServeQueue(const TString& name) { TBusMessageQueuePtr queue = Outer->Queues.FindByName(name); if (!queue) { BootstrapError(ConcatStrings("queue not found by name: ", name)); return; } BreadcrumbQueue(name); TDivGuard container("container"); H1(ConcatStrings("MessageBus queue ", '"', name, '"')); TBusMessageQueueStatus status = queue->GetStatusRecordInternal(); Pre(status.PrintToString()); ServeSessionsOfQueue(queue, false); HnWithSmall(3, "Peak queue size", "(stored for an hour)"); { TDivGuard div; TDivGuard div2(TAttr("id", "queue-size-graph"), TAttr("style", "height: 300px")); } { TScriptFunctionGuard script; NJsonWriter::TBuf data(NJsonWriter::HEM_ESCAPE_HTML); NJsonWriter::TBuf ticks(NJsonWriter::HEM_ESCAPE_HTML); const TExecutorHistory& history = status.ExecutorStatus.History; data.BeginList(); ticks.BeginList(); for (unsigned i = 0; i < history.HistoryRecords.size(); ++i) { ui64 secondOfMinute = (history.FirstHistoryRecordSecond() + i) % 60; ui64 minuteOfHour = (history.FirstHistoryRecordSecond() + i) / 60 % 60; unsigned printEach; if (history.HistoryRecords.size() <= 500) { printEach = 1; } else if (history.HistoryRecords.size() <= 1000) { printEach = 2; } else if (history.HistoryRecords.size() <= 3000) { printEach = 6; } else { printEach = 12; } if (secondOfMinute % printEach != 0) { continue; } ui32 max = 0; for (unsigned j = 0; j < printEach; ++j) { if (i < j) { continue; } max = Max(max, history.HistoryRecords[i - j].MaxQueueSize); } data.BeginList(); data.WriteString(ToString(i)); data.WriteInt(max); data.EndList(); // TODO: can be done with flot time plugin if (history.HistoryRecords.size() <= 20) { ticks.BeginList(); ticks.WriteInt(i); ticks.WriteString(ToString(secondOfMinute)); ticks.EndList(); } else if (history.HistoryRecords.size() <= 60) { if (secondOfMinute % 5 == 0) { ticks.BeginList(); ticks.WriteInt(i); ticks.WriteString(ToString(secondOfMinute)); ticks.EndList(); } } else { bool needTick; if (history.HistoryRecords.size() <= 3 * 60) { needTick = secondOfMinute % 15 == 0; } else if (history.HistoryRecords.size() <= 7 * 60) { needTick = secondOfMinute % 30 == 0; } else if (history.HistoryRecords.size() <= 20 * 60) { needTick = secondOfMinute == 0; } else { needTick = secondOfMinute == 0 && minuteOfHour % 5 == 0; } if (needTick) { ticks.BeginList(); ticks.WriteInt(i); ticks.WriteString(Sprintf(":%02u:%02u", (unsigned)minuteOfHour, (unsigned)secondOfMinute)); ticks.EndList(); } } } ticks.EndList(); data.EndList(); HtmlOutputStream() << " var data = " << data.Str() << ";\n"; HtmlOutputStream() << " var ticks = " << ticks.Str() << ";\n"; HtmlOutputStream() << " plotQueueSize('#queue-size-graph', data, ticks);\n"; } } void ServeSession(TStringBuf name, bool client) { TIntrusivePtr clientSession; TIntrusivePtr serverSession; TBusSession* session; TStringBuf whatSession; if (client) { whatSession = "client session"; clientSession = Outer->ClientSessions.FindByName(name); session = clientSession.Get(); } else { whatSession = "server session"; serverSession = Outer->ServerSessions.FindByName(name); session = serverSession.Get(); } if (!session) { BootstrapError(ConcatStrings(whatSession, " not found by name: ", name)); return; } TSessionDumpStatus dumpStatus = session->GetStatusRecordInternal(); TBusMessageQueuePtr queue = session->GetQueue(); TString queueName = Outer->Queues.FindNameByPtr(session->GetQueue()); BreadcrumbSession(name, client); TDivGuard container("container"); H1(ConcatStrings("MessageBus ", whatSession, " ", '"', name, '"')); TBusMessageQueueStatus queueStatus = queue->GetStatusRecordInternal(); { H3(ConcatStrings("queue ", queueName)); Pre(queueStatus.PrintToString()); } TSessionDumpStatus status = session->GetStatusRecordInternal(); if (status.Shutdown) { BootstrapError("Session shut down"); return; } H3("Basic"); Pre(status.Head); if (status.ConnectionStatusSummary.Server) { H3("Acceptors"); Pre(status.Acceptors); } H3("Connections"); Pre(status.ConnectionsSummary); { TDivGuard div; TTagGuard button("button", TAttr("type", "button"), TAttr("class", "btn"), TAttr("data-toggle", "collapse"), TAttr("data-target", "#connections")); Text("Show connection details"); } { TDivGuard div(TAttr("id", "connections"), TAttr("class", "collapse")); Pre(status.Connections); } H3("TBusSessionConfig"); Pre(status.Config.PrintToString()); if (!client) { H3("Message process time histogram"); const TDurationHistogram& h = dumpStatus.ConnectionStatusSummary.WriterStatus.Incremental.ProcessDurationHistogram; { TDivGuard div; TDivGuard div2(TAttr("id", "h"), TAttr("style", "height: 300px")); } { TScriptFunctionGuard script; NJsonWriter::TBuf buf(NJsonWriter::HEM_ESCAPE_HTML); buf.BeginList(); for (unsigned i = 0; i < h.Times.size(); ++i) { TString label = TDurationHistogram::LabelBefore(i); buf.BeginList(); buf.WriteString(label); buf.WriteLongLong(h.Times[i]); buf.EndList(); } buf.EndList(); HtmlOutputStream() << " var hist = " << buf.Str() << ";\n"; HtmlOutputStream() << " plotHist('#h', hist);\n"; } } } void ServeDefault() { if (!Outer->Queues) { BootstrapError("no queues"); return; } BreadcrumbRoot(); TDivGuard container("container"); H1("MessageBus queues"); for (unsigned i = 0; i < Outer->Queues.size(); ++i) { TString queueName = Outer->Queues.Entries[i].first; TBusMessageQueuePtr queue = Outer->Queues.Entries[i].second; HnWithSmall(3, queueName, "(queue)"); ServeSessionsOfQueue(queue, true); } } void WriteQueueSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf queueName, TBusMessageQueue* queue) { auto status = queue->GetStatusRecordInternal(); sj.OpenMetric(); sj.WriteLabels("mb_queue", queueName, "sensor", "WorkQueueSize"); sj.WriteValue(status.ExecutorStatus.WorkQueueSize); sj.CloseMetric(); } void WriteMessageCounterSensors(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf labelName, TStringBuf sessionName, bool read, const TMessageCounter& counter) { TStringBuf readOrWrite = read ? "read" : "write"; sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageBytes"); sj.WriteValue(counter.BytesData); sj.WriteModeDeriv(); sj.CloseMetric(); sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "mb_dir", readOrWrite, "sensor", "MessageCount"); sj.WriteValue(counter.Count); sj.WriteModeDeriv(); sj.CloseMetric(); } void WriteSessionStatus(NMonitoring::TDeprecatedJsonWriter& sj, TStringBuf sessionName, bool client, TBusSession* session) { TStringBuf labelName = client ? "mb_client_session" : "mb_server_session"; auto status = session->GetStatusRecordInternal(); sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "InFlightCount"); sj.WriteValue(status.Status.InFlightCount); sj.CloseMetric(); sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "InFlightSize"); sj.WriteValue(status.Status.InFlightSize); sj.CloseMetric(); sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "SendQueueSize"); sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.SendQueueSize); sj.CloseMetric(); if (client) { sj.OpenMetric(); sj.WriteLabels(labelName, sessionName, "sensor", "AckMessagesSize"); sj.WriteValue(status.ConnectionStatusSummary.WriterStatus.AckMessagesSize); sj.CloseMetric(); } WriteMessageCounterSensors(sj, labelName, sessionName, false, status.ConnectionStatusSummary.WriterStatus.Incremental.MessageCounter); WriteMessageCounterSensors(sj, labelName, sessionName, true, status.ConnectionStatusSummary.ReaderStatus.Incremental.MessageCounter); } void ServeSolomonJson(const TString& q, const TString& cs, const TString& ss) { Y_UNUSED(q); Y_UNUSED(cs); Y_UNUSED(ss); bool all = q == "" && cs == "" && ss == ""; NMonitoring::TDeprecatedJsonWriter sj(&Os); sj.OpenDocument(); sj.OpenMetrics(); for (unsigned i = 0; i < Outer->Queues.size(); ++i) { TString queueName = Outer->Queues.Entries[i].first; TBusMessageQueuePtr queue = Outer->Queues.Entries[i].second; if (all || q == queueName) { WriteQueueSensors(sj, queueName, &*queue); } TVector clientNames = Outer->ClientSessions.GetNamesForQueue(queue.Get()); TVector serverNames = Outer->ServerSessions.GetNamesForQueue(queue.Get()); TVector moduleNames = Outer->Modules.GetNamesForQueue(queue.Get()); for (auto& sessionName : clientNames) { if (all || cs == sessionName) { auto session = Outer->ClientSessions.FindByName(sessionName); WriteSessionStatus(sj, sessionName, true, &*session); } } for (auto& sessionName : serverNames) { if (all || ss == sessionName) { auto session = Outer->ServerSessions.FindByName(sessionName); WriteSessionStatus(sj, sessionName, false, &*session); } } } sj.CloseMetrics(); sj.CloseDocument(); } void ServeStatic(IOutputStream& os, TStringBuf path) { if (path.EndsWith(".js")) { os << HTTP_OK_JS; } else if (path.EndsWith(".png")) { os << HTTP_OK_PNG; } else { os << HTTP_OK_BIN; } auto blob = NResource::Find(TString("/") + TString(path)); os.Write(blob.Data(), blob.Size()); } void HeaderJsCss() { LinkStylesheet("//yandex.st/bootstrap/3.0.2/css/bootstrap.css"); LinkFavicon("?file=bus-ico.png"); ScriptHref("//yandex.st/jquery/2.0.3/jquery.js"); ScriptHref("//yandex.st/bootstrap/3.0.2/js/bootstrap.js"); ScriptHref("//cdnjs.cloudflare.com/ajax/libs/flot/0.8.1/jquery.flot.min.js"); ScriptHref("//cdnjs.cloudflare.com/ajax/libs/flot/0.8.1/jquery.flot.categories.min.js"); ScriptHref("?file=messagebus.js"); } void Serve() { THtmlOutputStreamPushPop pp(&Os); TCgiParameters::const_iterator file = CgiParams.Find("file"); if (file != CgiParams.end()) { ServeStatic(Os, file->second); return; } bool solomonJson = false; TCgiParameters::const_iterator fmt = CgiParams.Find("fmt"); if (fmt != CgiParams.end()) { if (fmt->second == "solomon-json") { solomonJson = true; } } TCgiParameters::const_iterator cs = CgiParams.Find("cs"); TCgiParameters::const_iterator ss = CgiParams.Find("ss"); TCgiParameters::const_iterator q = CgiParams.Find("q"); if (solomonJson) { Os << HTTP_OK_JSON; TString qp = q != CgiParams.end() ? q->first : ""; TString csp = cs != CgiParams.end() ? cs->first : ""; TString ssp = ss != CgiParams.end() ? ss->first : ""; ServeSolomonJson(qp, csp, ssp); } else { Os << HTTP_OK_HTML; Doctype(); TTagGuard html("html"); { TTagGuard head("head"); HeaderJsCss(); // ✉ 🚌 Title(TChars("MessageBus", false)); } TTagGuard body("body"); if (cs != CgiParams.end()) { ServeSession(cs->second, true); } else if (ss != CgiParams.end()) { ServeSession(ss->second, false); } else if (q != CgiParams.end()) { ServeQueue(q->second); } else { ServeDefault(); } } } }; void ServeHttp(IOutputStream& os, const TCgiParameters& queryArgs, const TBusWww::TOptionalParams& params) { TGuard g(Mutex); TRequest request(this, os, queryArgs, params); request.Serve(); } }; NBus::TBusWww::TBusWww() : Impl(new TImpl) { } NBus::TBusWww::~TBusWww() { } void NBus::TBusWww::RegisterClientSession(TBusClientSessionPtr s) { Impl->RegisterClientSession(s); } void TBusWww::RegisterServerSession(TBusServerSessionPtr s) { Impl->RegisterServerSession(s); } void TBusWww::RegisterQueue(TBusMessageQueuePtr q) { Impl->RegisterQueue(q); } void TBusWww::RegisterModule(TBusModule* module) { Impl->RegisterModule(module); } void TBusWww::ServeHttp(IOutputStream& httpOutputStream, const TCgiParameters& queryArgs, const TBusWww::TOptionalParams& params) { Impl->ServeHttp(httpOutputStream, queryArgs, params); } struct TBusWwwHttpServer::TImpl: public THttpServer::ICallBack { TIntrusivePtr Www; THttpServer HttpServer; static THttpServer::TOptions MakeHttpServerOptions(unsigned port) { Y_ABORT_UNLESS(port > 0); THttpServer::TOptions r; r.Port = port; return r; } TImpl(TIntrusivePtr www, unsigned port) : Www(www) , HttpServer(this, MakeHttpServerOptions(port)) { HttpServer.Start(); } struct TClientRequestImpl: public TClientRequest { TBusWwwHttpServer::TImpl* const Outer; TClientRequestImpl(TBusWwwHttpServer::TImpl* outer) : Outer(outer) { } bool Reply(void*) override { Outer->ServeRequest(Input(), Output()); return true; } }; TString MakeSimpleResponse(unsigned code, TString text, TString content = "") { if (!content) { TStringStream contentSs; contentSs << code << " " << text; content = contentSs.Str(); } TStringStream ss; ss << "HTTP/1.1 " << code << " " << text << "\r\nConnection: Close\r\n\r\n" << content; return ss.Str(); } void ServeRequest(THttpInput& input, THttpOutput& output) { TCgiParameters cgiParams; try { THttpRequestHeader header; THttpHeaderParser parser; parser.Init(&header); if (parser.Execute(input.FirstLine()) < 0) { HtmlOutputStream() << MakeSimpleResponse(400, "Bad request"); return; } THttpURL url; if (url.Parse(header.GetUrl()) != THttpURL::ParsedOK) { HtmlOutputStream() << MakeSimpleResponse(400, "Invalid url"); return; } cgiParams.Scan(url.Get(THttpURL::FieldQuery)); TBusWww::TOptionalParams params; //params.ParentLinks.emplace_back(); //params.ParentLinks.back().Title = "temp"; //params.ParentLinks.back().Href = "http://wiki.yandex-team.ru/"; Www->ServeHttp(output, cgiParams, params); } catch (...) { output << MakeSimpleResponse(500, "Exception", TString() + "Exception: " + CurrentExceptionMessage()); } } TClientRequest* CreateClient() override { return new TClientRequestImpl(this); } ~TImpl() override { HttpServer.Stop(); } }; NBus::TBusWwwHttpServer::TBusWwwHttpServer(TIntrusivePtr www, unsigned port) : Impl(new TImpl(www, port)) { } NBus::TBusWwwHttpServer::~TBusWwwHttpServer() { }