example.cpp 8.7 KB


  1. #include <library/cpp/testing/unittest/registar.h>
  2. #include "example.h"
  3. #include <util/generic/cast.h>
  4. using namespace NBus;
  5. using namespace NBus::NTest;
  6. static void FillWithJunk(TArrayRef<char> data) {
  7. TStringBuf junk =
  8. "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
  9. "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
  10. "01234567890123456789012345678901234567890123456789012345678901234567890123456789"
  11. "01234567890123456789012345678901234567890123456789012345678901234567890123456789";
  12. for (size_t i = 0; i < data.size(); i += junk.size()) {
  13. memcpy(data.data() + i, junk.data(), Min(junk.size(), data.size() - i));
  14. }
  15. }
  16. static TString JunkString(size_t len) {
  17. TTempBuf temp(len);
  18. TArrayRef<char> tempArrayRef(temp.Data(), len);
  19. FillWithJunk(tempArrayRef);
  20. return TString(tempArrayRef.data(), tempArrayRef.size());
  21. }
  22. TExampleRequest::TExampleRequest(TAtomic* counterPtr, size_t payloadSize)
  23. : TBusMessage(77)
  24. , AllocCounter(counterPtr)
  25. , Data(JunkString(payloadSize))
  26. {
  27. }
  28. TExampleRequest::TExampleRequest(ECreateUninitialized, TAtomic* counterPtr)
  29. : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
  30. , AllocCounter(counterPtr)
  31. {
  32. }
  33. TExampleResponse::TExampleResponse(TAtomic* counterPtr, size_t payloadSize)
  34. : TBusMessage(79)
  35. , AllocCounter(counterPtr)
  36. , Data(JunkString(payloadSize))
  37. {
  38. }
  39. TExampleResponse::TExampleResponse(ECreateUninitialized, TAtomic* counterPtr)
  40. : TBusMessage(MESSAGE_CREATE_UNINITIALIZED)
  41. , AllocCounter(counterPtr)
  42. {
  43. }
  44. TExampleProtocol::TExampleProtocol(int port)
  45. : TBusProtocol("Example", port)
  46. , RequestCount(0)
  47. , ResponseCount(0)
  48. , RequestCountDeserialized(0)
  49. , ResponseCountDeserialized(0)
  50. , StartCount(0)
  51. {
  52. }
  53. TExampleProtocol::~TExampleProtocol() {
  54. if (UncaughtException()) {
  55. // so it could be reported in test
  56. return;
  57. }
  58. Y_ABORT_UNLESS(0 == AtomicGet(RequestCount), "protocol %s: must be 0 requests allocated, actually %d", GetService(), int(RequestCount));
  59. Y_ABORT_UNLESS(0 == AtomicGet(ResponseCount), "protocol %s: must be 0 responses allocated, actually %d", GetService(), int(ResponseCount));
  60. Y_ABORT_UNLESS(0 == AtomicGet(RequestCountDeserialized), "protocol %s: must be 0 requests deserialized allocated, actually %d", GetService(), int(RequestCountDeserialized));
  61. Y_ABORT_UNLESS(0 == AtomicGet(ResponseCountDeserialized), "protocol %s: must be 0 responses deserialized allocated, actually %d", GetService(), int(ResponseCountDeserialized));
  62. Y_ABORT_UNLESS(0 == AtomicGet(StartCount), "protocol %s: must be 0 start objects allocated, actually %d", GetService(), int(StartCount));
  63. }
  64. void TExampleProtocol::Serialize(const TBusMessage* message, TBuffer& buffer) {
  65. // Messages have no data, we recreate them from scratch
  66. // instead of sending, so we don't need to serialize them.
  67. if (const TExampleRequest* exampleMessage = dynamic_cast<const TExampleRequest*>(message)) {
  68. buffer.Append(exampleMessage->Data.data(), exampleMessage->Data.size());
  69. } else if (const TExampleResponse* exampleReply = dynamic_cast<const TExampleResponse*>(message)) {
  70. buffer.Append(exampleReply->Data.data(), exampleReply->Data.size());
  71. } else {
  72. Y_ABORT("unknown message type");
  73. }
  74. }
  75. TAutoPtr<TBusMessage> TExampleProtocol::Deserialize(ui16 messageType, TArrayRef<const char> payload) {
  76. // TODO: check data
  77. Y_UNUSED(payload);
  78. if (messageType == 77) {
  79. TExampleRequest* exampleMessage = new TExampleRequest(MESSAGE_CREATE_UNINITIALIZED, &RequestCountDeserialized);
  80. exampleMessage->Data.append(payload.data(), payload.size());
  81. return exampleMessage;
  82. } else if (messageType == 79) {
  83. TExampleResponse* exampleReply = new TExampleResponse(MESSAGE_CREATE_UNINITIALIZED, &ResponseCountDeserialized);
  84. exampleReply->Data.append(payload.data(), payload.size());
  85. return exampleReply;
  86. } else {
  87. return nullptr;
  88. }
  89. }
  90. TExampleClient::TExampleClient(const TBusClientSessionConfig sessionConfig, int port)
  91. : Proto(port)
  92. , UseCompression(false)
  93. , CrashOnError(false)
  94. , DataSize(320)
  95. , MessageCount(0)
  96. , RepliesCount(0)
  97. , Errors(0)
  98. , LastError(MESSAGE_OK)
  99. {
  100. Bus = CreateMessageQueue("TExampleClient");
  101. Session = TBusClientSession::Create(&Proto, this, sessionConfig, Bus);
  102. Session->RegisterService("localhost");
  103. }
  104. TExampleClient::~TExampleClient() {
  105. }
  106. EMessageStatus TExampleClient::SendMessage(const TNetAddr* addr) {
  107. TAutoPtr<TExampleRequest> message(new TExampleRequest(&Proto.RequestCount, DataSize));
  108. message->SetCompressed(UseCompression);
  109. return Session->SendMessageAutoPtr(message, addr);
  110. }
  111. void TExampleClient::SendMessages(size_t count, const TNetAddr* addr) {
  112. UNIT_ASSERT(MessageCount == 0);
  113. UNIT_ASSERT(RepliesCount == 0);
  114. UNIT_ASSERT(Errors == 0);
  115. WorkDone.Reset();
  116. MessageCount = count;
  117. for (ssize_t i = 0; i < MessageCount; ++i) {
  118. EMessageStatus s = SendMessage(addr);
  119. UNIT_ASSERT_EQUAL_C(s, MESSAGE_OK, "expecting OK, got " << s);
  120. }
  121. }
  122. void TExampleClient::SendMessages(size_t count, const TNetAddr& addr) {
  123. SendMessages(count, &addr);
  124. }
  125. void TExampleClient::ResetCounters() {
  126. MessageCount = 0;
  127. RepliesCount = 0;
  128. Errors = 0;
  129. LastError = MESSAGE_OK;
  130. WorkDone.Reset();
  131. }
  132. void TExampleClient::WaitReplies() {
  133. WorkDone.WaitT(TDuration::Seconds(60));
  134. UNIT_ASSERT_VALUES_EQUAL(AtomicGet(RepliesCount), MessageCount);
  135. UNIT_ASSERT_VALUES_EQUAL(AtomicGet(Errors), 0);
  136. UNIT_ASSERT_VALUES_EQUAL(Session->GetInFlight(), 0);
  137. ResetCounters();
  138. }
  139. EMessageStatus TExampleClient::WaitForError() {
  140. WorkDone.WaitT(TDuration::Seconds(60));
  141. UNIT_ASSERT_VALUES_EQUAL(1, MessageCount);
  142. UNIT_ASSERT_VALUES_EQUAL(0, AtomicGet(RepliesCount));
  143. UNIT_ASSERT_VALUES_EQUAL(0, Session->GetInFlight());
  144. UNIT_ASSERT_VALUES_EQUAL(1, Errors);
  145. EMessageStatus result = LastError;
  146. ResetCounters();
  147. return result;
  148. }
  149. void TExampleClient::WaitForError(EMessageStatus status) {
  150. EMessageStatus error = WaitForError();
  151. UNIT_ASSERT_VALUES_EQUAL(status, error);
  152. }
  153. void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr* addr) {
  154. SendMessages(count, addr);
  155. WaitReplies();
  156. }
  157. void TExampleClient::SendMessagesWaitReplies(size_t count, const TNetAddr& addr) {
  158. SendMessagesWaitReplies(count, &addr);
  159. }
  160. void TExampleClient::OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) {
  161. Y_UNUSED(mess);
  162. Y_UNUSED(reply);
  163. if (AtomicIncrement(RepliesCount) == MessageCount) {
  164. WorkDone.Signal();
  165. }
  166. }
  167. void TExampleClient::OnError(TAutoPtr<TBusMessage> mess, EMessageStatus status) {
  168. if (CrashOnError) {
  169. Y_ABORT("client failed: %s", ToCString(status));
  170. }
  171. Y_UNUSED(mess);
  172. AtomicIncrement(Errors);
  173. LastError = status;
  174. WorkDone.Signal();
  175. }
  176. TExampleServer::TExampleServer(
  177. const char* name,
  178. const TBusServerSessionConfig& sessionConfig)
  179. : UseCompression(false)
  180. , AckMessageBeforeSendReply(false)
  181. , ForgetRequest(false)
  182. {
  183. Bus = CreateMessageQueue(name);
  184. Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
  185. }
  186. TExampleServer::TExampleServer(unsigned port, const char* name)
  187. : UseCompression(false)
  188. , AckMessageBeforeSendReply(false)
  189. , ForgetRequest(false)
  190. {
  191. Bus = CreateMessageQueue(name);
  192. TBusServerSessionConfig sessionConfig;
  193. sessionConfig.ListenPort = port;
  194. Session = TBusServerSession::Create(&Proto, this, sessionConfig, Bus);
  195. }
  196. TExampleServer::~TExampleServer() {
  197. }
  198. size_t TExampleServer::GetInFlight() const {
  199. return Session->GetInFlight();
  200. }
  201. unsigned TExampleServer::GetActualListenPort() const {
  202. return Session->GetActualListenPort();
  203. }
  204. TNetAddr TExampleServer::GetActualListenAddr() const {
  205. return TNetAddr("127.0.0.1", GetActualListenPort());
  206. }
  207. void TExampleServer::WaitForOnMessageCount(unsigned n) {
  208. TestSync.WaitFor(n);
  209. }
  210. void TExampleServer::OnMessage(TOnMessageContext& mess) {
  211. TestSync.Inc();
  212. TExampleRequest* request = VerifyDynamicCast<TExampleRequest*>(mess.GetMessage());
  213. if (ForgetRequest) {
  214. mess.ForgetRequest();
  215. return;
  216. }
  217. TAutoPtr<TBusMessage> reply(new TExampleResponse(&Proto.ResponseCount, DataSize.GetOrElse(request->Data.size())));
  218. reply->SetCompressed(UseCompression);
  219. EMessageStatus status;
  220. if (AckMessageBeforeSendReply) {
  221. TBusIdentity ident;
  222. mess.AckMessage(ident);
  223. status = Session->SendReply(ident, reply.Release()); // TODO: leaks on error
  224. } else {
  225. status = mess.SendReplyMove(reply);
  226. }
  227. Y_ABORT_UNLESS(status == MESSAGE_OK, "failed to send reply: %s", ToString(status).data());
  228. }