|
@@ -33,20 +33,20 @@ void TMsgBusClientConfig::CrackAddress(const TString& address, TString& hostname
|
|
|
|
|
|
|
|
|
|
|
|
-struct TMessageCookie
|
|
|
-{
|
|
|
- virtual void Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) = 0;
|
|
|
- virtual ~TMessageCookie()
|
|
|
- {
|
|
|
- }
|
|
|
-};
|
|
|
-
|
|
|
-struct TSyncMessageCookie : public TMessageCookie {
|
|
|
- TAutoPtr<NBus::TBusMessage> Reply;
|
|
|
- NBus::EMessageStatus ErrorStatus = NBus::MESSAGE_UNKNOWN;
|
|
|
+struct TMessageCookie
|
|
|
+{
|
|
|
+ virtual void Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) = 0;
|
|
|
+ virtual ~TMessageCookie()
|
|
|
+ {
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+struct TSyncMessageCookie : public TMessageCookie {
|
|
|
+ TAutoPtr<NBus::TBusMessage> Reply;
|
|
|
+ NBus::EMessageStatus ErrorStatus = NBus::MESSAGE_UNKNOWN;
|
|
|
TManualEvent Ev;
|
|
|
|
|
|
- TSyncMessageCookie()
|
|
|
+ TSyncMessageCookie()
|
|
|
: Ev()
|
|
|
{}
|
|
|
|
|
@@ -54,42 +54,42 @@ struct TSyncMessageCookie : public TMessageCookie {
|
|
|
Ev.Wait();
|
|
|
}
|
|
|
|
|
|
- virtual void Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) {
|
|
|
+ virtual void Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) {
|
|
|
Y_UNUSED(msg.Release());
|
|
|
- ErrorStatus = errorStatus;
|
|
|
- Reply = reply;
|
|
|
+ ErrorStatus = errorStatus;
|
|
|
+ Reply = reply;
|
|
|
Ev.Signal();
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-
|
|
|
+
|
|
|
template <typename CallbackType>
|
|
|
-struct TAsyncMessageCookie : public TMessageCookie {
|
|
|
+struct TAsyncMessageCookie : public TMessageCookie {
|
|
|
CallbackType Callback;
|
|
|
void* Data;
|
|
|
-
|
|
|
+
|
|
|
explicit TAsyncMessageCookie(CallbackType callback, void* data)
|
|
|
- : Callback(callback)
|
|
|
+ : Callback(callback)
|
|
|
, Data(data)
|
|
|
- {}
|
|
|
-
|
|
|
+ {}
|
|
|
+
|
|
|
void Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) override;
|
|
|
-};
|
|
|
-
|
|
|
+};
|
|
|
+
|
|
|
template <>
|
|
|
void TAsyncMessageCookie<TMsgBusClient::TOnCall>::Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) {
|
|
|
msg->Data = Data;
|
|
|
Callback(errorStatus, reply);
|
|
|
delete this; // we must cleanup cookie after use
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
template <>
|
|
|
void TAsyncMessageCookie<TMsgBusClient::TOnCallWithRequest>::Signal(TAutoPtr<NBus::TBusMessage>& msg, NBus::EMessageStatus errorStatus, TAutoPtr<NBus::TBusMessage> reply) {
|
|
|
msg->Data = Data;
|
|
|
Callback(errorStatus, msg, reply);
|
|
|
delete this; // we must cleanup cookie after use
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
|
|
|
TMsgBusClientConfig::TMsgBusClientConfig()
|
|
|
: Ip("localhost")
|
|
@@ -130,7 +130,7 @@ void TMsgBusClient::Shutdown() {
|
|
|
|
|
|
NBus::EMessageStatus TMsgBusClient::SyncCall(TAutoPtr<NBus::TBusMessage> msg, TAutoPtr<NBus::TBusMessage> &reply) {
|
|
|
Y_VERIFY(!msg->Data);
|
|
|
- TAutoPtr<TSyncMessageCookie> cookie(new TSyncMessageCookie());
|
|
|
+ TAutoPtr<TSyncMessageCookie> cookie(new TSyncMessageCookie());
|
|
|
msg->Data = cookie.Get();
|
|
|
|
|
|
// msgbus would recreate second TAutoPtr for our msg pointer (wut?!) Second copy terminates in OnRelease/OnError where we release it.
|
|
@@ -145,24 +145,24 @@ NBus::EMessageStatus TMsgBusClient::SyncCall(TAutoPtr<NBus::TBusMessage> msg, TA
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-NBus::EMessageStatus TMsgBusClient::AsyncCall(TAutoPtr<NBus::TBusMessage> msg, TOnCall callback) {
|
|
|
+NBus::EMessageStatus TMsgBusClient::AsyncCall(TAutoPtr<NBus::TBusMessage> msg, TOnCall callback) {
|
|
|
TAutoPtr<TMessageCookie> cookie(new TAsyncMessageCookie<TOnCall>(callback, msg->Data));
|
|
|
- msg->Data = cookie.Get();
|
|
|
+ msg->Data = cookie.Get();
|
|
|
|
|
|
if (Config.UseCompression) {
|
|
|
msg->SetCompressed(true);
|
|
|
msg->SetCompressedResponse(true);
|
|
|
}
|
|
|
|
|
|
- NBus::EMessageStatus status = Session->SendMessage(msg.Get(), NetAddr.Get(), false);
|
|
|
+ NBus::EMessageStatus status = Session->SendMessage(msg.Get(), NetAddr.Get(), false);
|
|
|
|
|
|
if (status == NBus::MESSAGE_OK) {
|
|
|
// would be destructed in onresult/onerror
|
|
|
Y_UNUSED(cookie.Release());
|
|
|
Y_UNUSED(msg.Release());
|
|
|
}
|
|
|
-
|
|
|
- return status;
|
|
|
+
|
|
|
+ return status;
|
|
|
}
|
|
|
|
|
|
NBus::EMessageStatus TMsgBusClient::AsyncCall(TAutoPtr<NBus::TBusMessage> msg, TOnCallWithRequest callback) {
|
|
@@ -186,18 +186,18 @@ NBus::EMessageStatus TMsgBusClient::AsyncCall(TAutoPtr<NBus::TBusMessage> msg, T
|
|
|
}
|
|
|
|
|
|
void TMsgBusClient::OnResult(TAutoPtr<NBus::TBusMessage> pMessage, NBus::EMessageStatus status, TAutoPtr<NBus::TBusMessage> pReply) {
|
|
|
- static_cast<TMessageCookie*>(pMessage->Data)->Signal(pMessage, status, pReply);
|
|
|
-}
|
|
|
-
|
|
|
-void TMsgBusClient::OnReply(TAutoPtr<NBus::TBusMessage> pMessage, TAutoPtr<NBus::TBusMessage> pReply) {
|
|
|
+ static_cast<TMessageCookie*>(pMessage->Data)->Signal(pMessage, status, pReply);
|
|
|
+}
|
|
|
+
|
|
|
+void TMsgBusClient::OnReply(TAutoPtr<NBus::TBusMessage> pMessage, TAutoPtr<NBus::TBusMessage> pReply) {
|
|
|
OnResult(pMessage, NBus::MESSAGE_OK, pReply);
|
|
|
-}
|
|
|
-
|
|
|
+}
|
|
|
+
|
|
|
void TMsgBusClient::OnError(TAutoPtr<NBus::TBusMessage> pMessage, NBus::EMessageStatus status) {
|
|
|
if (status == NBus::MESSAGE_UNKNOWN) // timeouted request
|
|
|
return;
|
|
|
|
|
|
- OnResult(pMessage, status, TAutoPtr<NBus::TBusMessage>());
|
|
|
+ OnResult(pMessage, status, TAutoPtr<NBus::TBusMessage>());
|
|
|
}
|
|
|
|
|
|
const TMsgBusClientConfig& TMsgBusClient::GetConfig() {
|
|
@@ -205,10 +205,10 @@ const TMsgBusClientConfig& TMsgBusClient::GetConfig() {
|
|
|
}
|
|
|
|
|
|
EDataReqStatusExcerpt ExtractDataRequestStatus(const NKikimrClient::TResponse *record) {
|
|
|
- if (!record)
|
|
|
+ if (!record)
|
|
|
return EDataReqStatusExcerpt::Unknown;
|
|
|
-
|
|
|
- switch (record->GetStatus()) {
|
|
|
+
|
|
|
+ switch (record->GetStatus()) {
|
|
|
case MSTATUS_OK:
|
|
|
return EDataReqStatusExcerpt::Complete;
|
|
|
case MSTATUS_INPROGRESS:
|