123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- #pragma once
- #include <library/cpp/testing/unittest/registar.h>
- #include "alloc_counter.h"
- #include "message_handler_error.h"
- #include <library/cpp/messagebus/ybus.h>
- #include <library/cpp/messagebus/misc/test_sync.h>
- #include <util/system/event.h>
- namespace NBus {
- namespace NTest {
- class TExampleRequest: public TBusMessage {
- friend class TExampleProtocol;
- private:
- TAllocCounter AllocCounter;
- public:
- TString Data;
- public:
- TExampleRequest(TAtomic* counterPtr, size_t payloadSize = 320);
- TExampleRequest(ECreateUninitialized, TAtomic* counterPtr);
- };
- class TExampleResponse: public TBusMessage {
- friend class TExampleProtocol;
- private:
- TAllocCounter AllocCounter;
- public:
- TString Data;
- TExampleResponse(TAtomic* counterPtr, size_t payloadSize = 320);
- TExampleResponse(ECreateUninitialized, TAtomic* counterPtr);
- };
- class TExampleProtocol: public TBusProtocol {
- public:
- TAtomic RequestCount;
- TAtomic ResponseCount;
- TAtomic RequestCountDeserialized;
- TAtomic ResponseCountDeserialized;
- TAtomic StartCount;
- TExampleProtocol(int port = 0);
- ~TExampleProtocol() override;
- void Serialize(const TBusMessage* message, TBuffer& buffer) override;
- TAutoPtr<TBusMessage> Deserialize(ui16 messageType, TArrayRef<const char> payload) override;
- };
- class TExampleClient: private TBusClientHandlerError {
- public:
- TExampleProtocol Proto;
- bool UseCompression;
- bool CrashOnError;
- size_t DataSize;
- ssize_t MessageCount;
- TAtomic RepliesCount;
- TAtomic Errors;
- EMessageStatus LastError;
- TSystemEvent WorkDone;
- TBusMessageQueuePtr Bus;
- TBusClientSessionPtr Session;
- public:
- TExampleClient(const TBusClientSessionConfig sessionConfig = TBusClientSessionConfig(), int port = 0);
- ~TExampleClient() override;
- EMessageStatus SendMessage(const TNetAddr* addr = nullptr);
- void SendMessages(size_t count, const TNetAddr* addr = nullptr);
- void SendMessages(size_t count, const TNetAddr& addr);
- void ResetCounters();
- void WaitReplies();
- EMessageStatus WaitForError();
- void WaitForError(EMessageStatus status);
- void SendMessagesWaitReplies(size_t count, const TNetAddr* addr = nullptr);
- void SendMessagesWaitReplies(size_t count, const TNetAddr& addr);
- void OnReply(TAutoPtr<TBusMessage> mess, TAutoPtr<TBusMessage> reply) override;
- void OnError(TAutoPtr<TBusMessage> mess, EMessageStatus) override;
- };
- class TExampleServer: private TBusServerHandlerError {
- public:
- TExampleProtocol Proto;
- bool UseCompression;
- bool AckMessageBeforeSendReply;
- TMaybe<size_t> DataSize; // Nothing means use request size
- bool ForgetRequest;
- TTestSync TestSync;
- TBusMessageQueuePtr Bus;
- TBusServerSessionPtr Session;
- public:
- TExampleServer(
- const char* name = "TExampleServer",
- const TBusServerSessionConfig& sessionConfig = TBusServerSessionConfig());
- TExampleServer(unsigned port, const char* name = "TExampleServer");
- ~TExampleServer() override;
- public:
- size_t GetInFlight() const;
- unsigned GetActualListenPort() const;
- // any of
- TNetAddr GetActualListenAddr() const;
- void WaitForOnMessageCount(unsigned n);
- protected:
- void OnMessage(TOnMessageContext& mess) override;
- };
- }
- }
|