|
@@ -1,11 +1,8 @@
|
|
|
#pragma once
|
|
|
#include "json_pipe_req.h"
|
|
|
-#include "viewer.h"
|
|
|
-#include <library/cpp/json/json_writer.h>
|
|
|
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
|
|
|
|
|
|
-namespace NKikimr {
|
|
|
-namespace NViewer {
|
|
|
+namespace NKikimr::NViewer {
|
|
|
|
|
|
struct TEvLocalRpcPrivate {
|
|
|
enum EEv {
|
|
@@ -26,27 +23,15 @@ struct TEvLocalRpcPrivate {
|
|
|
};
|
|
|
};
|
|
|
|
|
|
-using namespace NActors;
|
|
|
-using NSchemeShard::TEvSchemeShard;
|
|
|
-
|
|
|
template <class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoService, class TRpcEv>
|
|
|
-class TJsonLocalRpc : public TActorBootstrapped<TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>> {
|
|
|
+class TJsonLocalRpc : public TViewerPipeClient {
|
|
|
using TThis = TJsonLocalRpc<TProtoRequest, TProtoResponse, TProtoResult, TProtoService, TRpcEv>;
|
|
|
- using TBase = TActorBootstrapped<TThis>;
|
|
|
-
|
|
|
- using TBase::Send;
|
|
|
- using TBase::PassAway;
|
|
|
- using TBase::Become;
|
|
|
+ using TBase = TViewerPipeClient;
|
|
|
|
|
|
protected:
|
|
|
- IViewer* Viewer;
|
|
|
- NMon::TEvHttpInfo::TPtr Event;
|
|
|
- TProtoRequest Request;
|
|
|
+ using TBase::ReplyAndPassAway;
|
|
|
+ std::vector<HTTP_METHOD> AllowedMethods = {};
|
|
|
TAutoPtr<TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>> Result;
|
|
|
-
|
|
|
- TJsonSettings JsonSettings;
|
|
|
- ui32 Timeout = 0;
|
|
|
- TString Database;
|
|
|
NThreading::TFuture<TProtoResponse> RpcFuture;
|
|
|
|
|
|
public:
|
|
@@ -54,13 +39,11 @@ public:
|
|
|
return NKikimrServices::TActivity::VIEWER_HANDLER;
|
|
|
}
|
|
|
|
|
|
- TJsonLocalRpc(IViewer* viewer, NMon::TEvHttpInfo::TPtr &ev)
|
|
|
- : Viewer(viewer)
|
|
|
- , Event(ev)
|
|
|
+ TJsonLocalRpc(IViewer* viewer, NMon::TEvHttpInfo::TPtr& ev)
|
|
|
+ : TBase(viewer, ev, TProtoRequest::descriptor()->name())
|
|
|
{}
|
|
|
|
|
|
- TProtoRequest Params2Proto(const TCgiParameters& params) {
|
|
|
- TProtoRequest request;
|
|
|
+ void Params2Proto(const TCgiParameters& params, TProtoRequest& request) {
|
|
|
using google::protobuf::Descriptor;
|
|
|
using google::protobuf::Reflection;
|
|
|
using google::protobuf::FieldDescriptor;
|
|
@@ -110,44 +93,52 @@ public:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return request;
|
|
|
}
|
|
|
|
|
|
- TProtoRequest Params2Proto() {
|
|
|
- TProtoRequest request;
|
|
|
- NProtobufJson::TJson2ProtoConfig json2ProtoConfig;
|
|
|
- auto postData = Event->Get()->Request.GetPostContent();
|
|
|
- if (!postData.empty()) {
|
|
|
- try {
|
|
|
- NProtobufJson::Json2Proto(postData, request, json2ProtoConfig);
|
|
|
- }
|
|
|
- catch (const yexception& e) {
|
|
|
- ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", e.what()));
|
|
|
+ bool ValidateProto(TProtoRequest& request) {
|
|
|
+ using google::protobuf::Descriptor;
|
|
|
+ using google::protobuf::Reflection;
|
|
|
+ using google::protobuf::FieldDescriptor;
|
|
|
+ const Descriptor& descriptor = *TProtoRequest::GetDescriptor();
|
|
|
+ const Reflection& reflection = *TProtoRequest::GetReflection();
|
|
|
+ for (int idx = 0; idx < descriptor.field_count(); ++idx) {
|
|
|
+ const FieldDescriptor* field = descriptor.field(idx);
|
|
|
+ const auto& options(field->options());
|
|
|
+ if (options.HasExtension(Ydb::required)) {
|
|
|
+ if (options.GetExtension(Ydb::required)) {
|
|
|
+ if (!reflection.HasField(request, field)) {
|
|
|
+ ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", TStringBuilder() << "field '" << field->name() << "' is required"));
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- } else {
|
|
|
- const auto& params(Event->Get()->Request.GetParams());
|
|
|
- return Params2Proto(params);
|
|
|
}
|
|
|
- return request;
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
- bool PostToRequest() {
|
|
|
+ bool Params2Proto(TProtoRequest& request) {
|
|
|
auto postData = Event->Get()->Request.GetPostContent();
|
|
|
if (!postData.empty()) {
|
|
|
try {
|
|
|
- NProtobufJson::Json2Proto(postData, Request, {});
|
|
|
- return true;
|
|
|
+ NProtobufJson::Json2Proto(postData, request);
|
|
|
}
|
|
|
catch (const yexception& e) {
|
|
|
- ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "text/plain", e.what()));
|
|
|
+ ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", e.what()));
|
|
|
return false;
|
|
|
}
|
|
|
+ } else {
|
|
|
+ const auto& params(Event->Get()->Request.GetParams());
|
|
|
+ Params2Proto(params, request);
|
|
|
+ }
|
|
|
+ if (!ValidateProto(request)) {
|
|
|
+ return false;
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- void SendGrpcRequest() {
|
|
|
- RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());
|
|
|
+ void SendGrpcRequest(TProtoRequest&& request) {
|
|
|
+ // TODO(xenoxeno): pass trace id
|
|
|
+ RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), Database, Event->Get()->UserToken, TlsActivationContext->ActorSystem());
|
|
|
RpcFuture.Subscribe([actorId = TBase::SelfId(), actorSystem = TlsActivationContext->ActorSystem()]
|
|
|
(const NThreading::TFuture<TProtoResponse>& future) {
|
|
|
auto& response = future.GetValueSync();
|
|
@@ -173,14 +164,21 @@ public:
|
|
|
}
|
|
|
|
|
|
virtual void Bootstrap() {
|
|
|
- const auto& params(Event->Get()->Request.GetParams());
|
|
|
- JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
|
|
|
- JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), true);
|
|
|
- Timeout = FromStringWithDefault<ui32>(params.Get("timeout"), 10000);
|
|
|
-
|
|
|
- SendGrpcRequest();
|
|
|
-
|
|
|
- Become(&TThis::StateRequested, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
|
|
|
+ if (!AllowedMethods.empty() && std::find(AllowedMethods.begin(), AllowedMethods.end(), Event->Get()->Request.GetMethod()) == AllowedMethods.end()) {
|
|
|
+ return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "Method is not allowed"));
|
|
|
+ }
|
|
|
+ if (Database.empty()) {
|
|
|
+ return ReplyAndPassAway(GetHTTPBADREQUEST("text/plain", "field 'database' is required"));
|
|
|
+ }
|
|
|
+ if (TBase::NeedToRedirect()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ TProtoRequest request;
|
|
|
+ if (!Params2Proto(request)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ SendGrpcRequest(std::move(request));
|
|
|
+ Become(&TThis::StateRequested, Timeout, new TEvents::TEvWakeup());
|
|
|
}
|
|
|
|
|
|
void Handle(typename TEvLocalRpcPrivate::TEvGrpcRequestResult<TProtoResult>::TPtr& ev) {
|
|
@@ -197,38 +195,24 @@ public:
|
|
|
|
|
|
void ReplyAndPassAway() {
|
|
|
if (Result && Result->Status) {
|
|
|
- if (!Result->Status->IsSuccess()) {
|
|
|
+ if (Result->Status->IsSuccess()) {
|
|
|
+ return ReplyAndPassAway(GetHTTPOKJSON(Result->Message));
|
|
|
+ } else {
|
|
|
NJson::TJsonValue json;
|
|
|
TString message;
|
|
|
MakeJsonErrorReply(json, message, Result->Status.value());
|
|
|
TStringStream stream;
|
|
|
NJson::WriteJson(&stream, &json);
|
|
|
if (Result->Status->GetStatus() == NYdb::EStatus::UNAUTHORIZED) {
|
|
|
- return ReplyAndPassAway(Viewer->GetHTTPFORBIDDEN(Event->Get(), "application/json", stream.Str()));
|
|
|
+ return ReplyAndPassAway(GetHTTPFORBIDDEN("application/json", stream.Str()), message);
|
|
|
} else {
|
|
|
- return ReplyAndPassAway(Viewer->GetHTTPBADREQUEST(Event->Get(), "application/json", stream.Str()));
|
|
|
+ return ReplyAndPassAway(GetHTTPBADREQUEST("application/json", stream.Str()), message);
|
|
|
}
|
|
|
- } else {
|
|
|
- TStringStream json;
|
|
|
- TProtoToJson::ProtoToJson(json, Result->Message, JsonSettings);
|
|
|
- return ReplyAndPassAway(Viewer->GetHTTPOKJSON(Event->Get(), json.Str()));
|
|
|
}
|
|
|
} else {
|
|
|
- return ReplyAndPassAway(Viewer->GetHTTPINTERNALERROR(Event->Get()));
|
|
|
+ return ReplyAndPassAway(GetHTTPINTERNALERROR("text/plain", "no Result or Status"), "internal error");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- void HandleTimeout() {
|
|
|
- ReplyAndPassAway(Viewer->GetHTTPGATEWAYTIMEOUT(Event->Get()));
|
|
|
- }
|
|
|
-
|
|
|
- void ReplyAndPassAway(TString data) {
|
|
|
- Send(Event->Sender, new NMon::TEvHttpInfoRes(data, 0, NMon::IEvHttpInfoRes::EContentType::Custom));
|
|
|
- PassAway();
|
|
|
- }
|
|
|
};
|
|
|
|
|
|
-
|
|
|
-}
|
|
|
-}
|
|
|
+} // namespace NKikimr::NViewer
|