example.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #pragma once
  2. #include <library/cpp/testing/unittest/registar.h>
  3. #include "alloc_counter.h"
  4. #include "message_handler_error.h"
  5. #include <library/cpp/messagebus/ybus.h>
  6. #include <library/cpp/messagebus/misc/test_sync.h>
  7. #include <util/system/event.h>
  8. namespace NBus {
  9. namespace NTest {
  10. class TExampleRequest: public TBusMessage {
  11. friend class TExampleProtocol;
  12. private:
  13. TAllocCounter AllocCounter;
  14. public:
  15. TString Data;
  16. public:
  17. TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
  18. TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
  19. };
  20. class TExampleResponse: public TBusMessage {
  21. friend class TExampleProtocol;
  22. private:
  23. TAllocCounter AllocCounter;
  24. public:
  25. TString Data;
  26. TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
  27. TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
  28. };
  29. class TExampleProtocol: public TBusProtocol {
  30. public:
  31. TAtomic RequestCount;
  32. TAtomic ResponseCount;
  33. TAtomic RequestCountDeserialized;
  34. TAtomic ResponseCountDeserialized;
  35. TAtomic StartCount;
  36. TExampleProtocol(int port = 0);
  37. ~TExampleProtocol() override;
  38. void Serialize(const TBusMessage* message, TBuffer& buffer) override;
  39. TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
  40. };
  41. class TExampleClient: private TBusClientHandlerError {
  42. public:
  43. TExampleProtocol Proto;
  44. bool UseCompression;
  45. bool CrashOnError;
  46. size_t DataSize;
  47. ssize_t MessageCount;
  48. TAtomic RepliesCount;
  49. TAtomic Errors;
  50. EMessageStatus LastError;
  51. TSystemEvent WorkDone;
  52. TBusMessageQueuePtr Bus;
  53. TBusClientSessionPtr Session;
  54. public:
  55. TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
  56. ~TExampleClient() override;
  57. EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
  58. void SendMessages(size_t count, const TNetAddr* addr = nullptr);
  59. void SendMessages(size_t count, const TNetAddr& addr);
  60. void ResetCounters();
  61. void WaitReplies();
  62. EMessageStatus WaitForError();
  63. void WaitForError(EMessageStatus status);
  64. void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
  65. void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
  66. void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
  67. void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
  68. };
  69. class TExampleServer: private TBusServerHandlerError {
  70. public:
  71. TExampleProtocol Proto;
  72. bool UseCompression;
  73. bool AckMessageBeforeSendReply;
  74. TMaybe<size_t> DataSize; // Nothing means use request size
  75. bool ForgetRequest;
  76. TTestSync TestSync;
  77. TBusMessageQueuePtr Bus;
  78. TBusServerSessionPtr Session;
  79. public:
  80. TExampleServer(
  81. const char* name = "TExampleServer",
  82. const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());
  83. TExampleServer(unsigned port, const char* name = "TExampleServer");
  84. ~TExampleServer() override;
  85. public:
  86. size_t GetInFlight() const;
  87. unsigned GetActualListenPort() const;
  88. // any of
  89. TNetAddr GetActualListenAddr() const;
  90. void WaitForOnMessageCount(unsigned n);
  91. protected:
  92. void OnMessage(TOnMessageContext& mess) override;
  93. };
  94. }
  95. }