#include #include "example.h" #include using namespace NBus; using namespace NBus::NTest; static void FillWithJunk(TArrayRef data) { TStringBuf junk = "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789" "01234567890123456789012345678901234567890123456789012345678901234567890123456789"; for (size_t i = 0; i < data.size(); i += junk.size()) { memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i)); } } static TString JunkString(size_t len) { TTempBuf temp(len); TArrayRef tempArrayRef(temp.Data(), len); FillWithJunk(tempArrayRef); return TString(tempArrayRef.data(), tempArrayRef.size()); } TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize) : TBusMessage(77) , AllocCounter(counterPtr) , Data(JunkString(payloadSize)) { } TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr) : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) , AllocCounter(counterPtr) { } TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize) : TBusMessage(79) , AllocCounter(counterPtr) , Data(JunkString(payloadSize)) { } TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr) : TBusMessage(MESSAGE_CREATE_UNINITIALIZED) , AllocCounter(counterPtr) { } TExampleProtocol::TExampleProtocol(int port) : TBusProtocol("Example", port) , RequestCount(0) , ResponseCount(0) , RequestCountDeserialized(0) , ResponseCountDeserialized(0) , StartCount(0) { } TExampleProtocol::~TExampleProtocol() { if (UncaughtException()) { // so it could be reported in test return; } Y_ABORT_UNLESS(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount)); Y_ABORT_UNLESS(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount)); Y_ABORT_UNLESS(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized)); Y_ABORT_UNLESS(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized)); Y_ABORT_UNLESS(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount)); } void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) { // Messages have no data, we recreate them from scratch // instead of sending, so we don't need to serialize them. if (const TExampleRequest* exampleMessage = dynamic_cast(message)) { buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size()); } else if (const TExampleResponse* exampleReply = dynamic_cast(message)) { buffer.Append(exampleReply->Data.data(), exampleReply->Data.size()); } else { Y_ABORT("unknown message type"); } } TAutoPtr TExampleProtocol::Deserialize(ui16 messageType, TArrayRef payload) { // TODO: check data Y_UNUSED(payload); if (messageType == 77) { TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized); exampleMessage->Data.append(payload.data(), payload.size()); return exampleMessage; } else if (messageType == 79) { TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized); exampleReply->Data.append(payload.data(), payload.size()); return exampleReply; } else { return nullptr; } } TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port) : Proto(port) , UseCompression(false) , CrashOnError(false) , DataSize(320) , MessageCount(0) , RepliesCount(0) , Errors(0) , LastError(MESSAGE_OK) { Bus = CreateMessageQueue("TExampleClient"); Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus); Session->RegisterService("localhost"); } TExampleClient::~TExampleClient() { } EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) { TAutoPtr message(new TExampleRequest(&Proto.RequestCount, DataSize)); message->SetCompressed(UseCompression); return Session->SendMessageAutoPtr(message, addr); } void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) { UNIT_ASSERT(MessageCount == 0); UNIT_ASSERT(RepliesCount == 0); UNIT_ASSERT(Errors == 0); WorkDone.Reset(); MessageCount = count; for (ssize_t i = 0; i < MessageCount; ++i) { EMessageStatus s = SendMessage(addr); UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s); } } void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) { SendMessages(count, &addr); } void TExampleClient::ResetCounters() { MessageCount = 0; RepliesCount = 0; Errors = 0; LastError = MESSAGE_OK; WorkDone.Reset(); } void TExampleClient::WaitReplies() { WorkDone.WaitT(TDuration::Seconds(60)); UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount); UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0); UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0); ResetCounters(); } EMessageStatus TExampleClient::WaitForError() { WorkDone.WaitT(TDuration::Seconds(60)); UNIT_ASSERT_VALUES_EQUAL(1, MessageCount); UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount)); UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight()); UNIT_ASSERT_VALUES_EQUAL(1, Errors); EMessageStatus result = LastError; ResetCounters(); return result; } void TExampleClient::WaitForError(EMessageStatus status) { EMessageStatus error = WaitForError(); UNIT_ASSERT_VALUES_EQUAL(status, error); } void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) { SendMessages(count, addr); WaitReplies(); } void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) { SendMessagesWaitReplies(count, &addr); } void TExampleClient::OnReply(TAutoPtr mess, TAutoPtr reply) { Y_UNUSED(mess); Y_UNUSED(reply); if (AtomicIncrement(RepliesCount) == MessageCount) { WorkDone.Signal(); } } void TExampleClient::OnError(TAutoPtr mess, EMessageStatus status) { if (CrashOnError) { Y_ABORT("client failed: %s", ToCString(status)); } Y_UNUSED(mess); AtomicIncrement(Errors); LastError = status; WorkDone.Signal(); } TExampleServer::TExampleServer( const char* name, const TBusServerSessionConfig& sessionConfig) : UseCompression(false) , AckMessageBeforeSendReply(false) , ForgetRequest(false) { Bus = CreateMessageQueue(name); Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); } TExampleServer::TExampleServer(unsigned port, const char* name) : UseCompression(false) , AckMessageBeforeSendReply(false) , ForgetRequest(false) { Bus = CreateMessageQueue(name); TBusServerSessionConfig sessionConfig; sessionConfig.ListenPort = port; Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus); } TExampleServer::~TExampleServer() { } size_t TExampleServer::GetInFlight() const { return Session->GetInFlight(); } unsigned TExampleServer::GetActualListenPort() const { return Session->GetActualListenPort(); } TNetAddr TExampleServer::GetActualListenAddr() const { return TNetAddr("127.0.0.1", GetActualListenPort()); } void TExampleServer::WaitForOnMessageCount(unsigned n) { TestSync.WaitFor(n); } void TExampleServer::OnMessage(TOnMessageContext& mess) { TestSync.Inc(); TExampleRequest* request = VerifyDynamicCast(mess.GetMessage()); if (ForgetRequest) { mess.ForgetRequest(); return; } TAutoPtr reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size()))); reply->SetCompressed(UseCompression); EMessageStatus status; if (AckMessageBeforeSendReply) { TBusIdentity ident; mess.AckMessage(ident); status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error } else { status = mess.SendReplyMove(reply); } Y_ABORT_UNLESS(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data()); }