123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- #include <library/cpp/testing/unittest/registar.h>
- #include "example.h"
- #include <util/generic/cast.h>
- using namespace NBus;
- using namespace NBus::NTest;
- static void FillWithJunk(TArrayRef<char> data) {
- TStringBuf junk =
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
- "01234567890123456789012345678901234567890123456789012345678901234567890123456789";
- for (size_t i = 0; i < data.size(); i += junk.size()) {
- memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i));
- }
- }
- static TString JunkString(size_t len) {
- TTempBuf temp(len);
- TArrayRef<char> tempArrayRef(temp.Data(), len);
- FillWithJunk(tempArrayRef);
- return TString(tempArrayRef.data(), tempArrayRef.size());
- }
- TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(77)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
- {
- }
- TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
- {
- }
- TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
- : TBusMessage(79)
- , AllocCounter(counterPtr)
- , Data(JunkString(payloadSize))
- {
- }
- TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
- : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
- , AllocCounter(counterPtr)
- {
- }
- TExampleProtocol::TExampleProtocol(int port)
- : TBusProtocol("Example", port)
- , RequestCount(0)
- , ResponseCount(0)
- , RequestCountDeserialized(0)
- , ResponseCountDeserialized(0)
- , StartCount(0)
- {
- }
- TExampleProtocol::~TExampleProtocol() {
- if (UncaughtException()) {
- // so it could be reported in test
- return;
- }
- Y_ABORT_UNLESS(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount));
- Y_ABORT_UNLESS(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount));
- Y_ABORT_UNLESS(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized));
- Y_ABORT_UNLESS(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized));
- Y_ABORT_UNLESS(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount));
- }
- void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
- // Messages have no data, we recreate them from scratch
- // instead of sending, so we don't need to serialize them.
- if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
- buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size());
- } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
- buffer.Append(exampleReply->Data.data(), exampleReply->Data.size());
- } else {
- Y_ABORT("unknown message type");
- }
- }
- TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
- // TODO: check data
- Y_UNUSED(payload);
- if (messageType == 77) {
- TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
- exampleMessage->Data.append(payload.data(), payload.size());
- return exampleMessage;
- } else if (messageType == 79) {
- TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
- exampleReply->Data.append(payload.data(), payload.size());
- return exampleReply;
- } else {
- return nullptr;
- }
- }
- TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
- : Proto(port)
- , UseCompression(false)
- , CrashOnError(false)
- , DataSize(320)
- , MessageCount(0)
- , RepliesCount(0)
- , Errors(0)
- , LastError(MESSAGE_OK)
- {
- Bus = CreateMessageQueue("TExampleClient");
- Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
- Session->RegisterService("localhost");
- }
- TExampleClient::~TExampleClient() {
- }
- EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
- TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
- message->SetCompressed(UseCompression);
- return Session->SendMessageAutoPtr(message, addr);
- }
- void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
- UNIT_ASSERT(MessageCount == 0);
- UNIT_ASSERT(RepliesCount == 0);
- UNIT_ASSERT(Errors == 0);
- WorkDone.Reset();
- MessageCount = count;
- for (ssize_t i = 0; i < MessageCount; ++i) {
- EMessageStatus s = SendMessage(addr);
- UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
- }
- }
- void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
- SendMessages(count, &addr);
- }
- void TExampleClient::ResetCounters() {
- MessageCount = 0;
- RepliesCount = 0;
- Errors = 0;
- LastError = MESSAGE_OK;
- WorkDone.Reset();
- }
- void TExampleClient::WaitReplies() {
- WorkDone.WaitT(TDuration::Seconds(60));
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
- UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
- ResetCounters();
- }
- EMessageStatus TExampleClient::WaitForError() {
- WorkDone.WaitT(TDuration::Seconds(60));
- UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
- UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
- UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
- UNIT_ASSERT_VALUES_EQUAL(1, Errors);
- EMessageStatus result = LastError;
- ResetCounters();
- return result;
- }
- void TExampleClient::WaitForError(EMessageStatus status) {
- EMessageStatus error = WaitForError();
- UNIT_ASSERT_VALUES_EQUAL(status, error);
- }
- void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
- SendMessages(count, addr);
- WaitReplies();
- }
- void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
- SendMessagesWaitReplies(count, &addr);
- }
- void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
- Y_UNUSED(mess);
- Y_UNUSED(reply);
- if (AtomicIncrement(RepliesCount) == MessageCount) {
- WorkDone.Signal();
- }
- }
- void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
- if (CrashOnError) {
- Y_ABORT("client failed: %s", ToCString(status));
- }
- Y_UNUSED(mess);
- AtomicIncrement(Errors);
- LastError = status;
- WorkDone.Signal();
- }
- TExampleServer::TExampleServer(
- const char* name,
- const TBusServerSessionConfig& sessionConfig)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
- {
- Bus = CreateMessageQueue(name);
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
- }
- TExampleServer::TExampleServer(unsigned port, const char* name)
- : UseCompression(false)
- , AckMessageBeforeSendReply(false)
- , ForgetRequest(false)
- {
- Bus = CreateMessageQueue(name);
- TBusServerSessionConfig sessionConfig;
- sessionConfig.ListenPort = port;
- Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
- }
- TExampleServer::~TExampleServer() {
- }
- size_t TExampleServer::GetInFlight() const {
- return Session->GetInFlight();
- }
- unsigned TExampleServer::GetActualListenPort() const {
- return Session->GetActualListenPort();
- }
- TNetAddr TExampleServer::GetActualListenAddr() const {
- return TNetAddr("127.0.0.1", GetActualListenPort());
- }
- void TExampleServer::WaitForOnMessageCount(unsigned n) {
- TestSync.WaitFor(n);
- }
- void TExampleServer::OnMessage(TOnMessageContext& mess) {
- TestSync.Inc();
- TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
- if (ForgetRequest) {
- mess.ForgetRequest();
- return;
- }
- TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
- reply->SetCompressed(UseCompression);
- EMessageStatus status;
- if (AckMessageBeforeSendReply) {
- TBusIdentity ident;
- mess.AckMessage(ident);
- status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
- } else {
- status = mess.SendReplyMove(reply);
- }
- Y_ABORT_UNLESS(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data());
- }
|