Browse Source

Restoring authorship annotation for <vladimir@yandex-team.ru>. Commit 2 of 2.

vladimir 3 years ago
parent
commit
4bac7bacd0

+ 3 - 3
library/cpp/messagebus/coreconn.cpp

@@ -12,7 +12,7 @@ namespace NBus {
     TBusInstant Now() {
         return millisec();
     }
- 
+
     EIpVersion MakeIpVersion(bool allowIpv4, bool allowIpv6) {
         if (allowIpv4) {
             if (allowIpv6) {
@@ -23,8 +23,8 @@ namespace NBus {
         } else if (allowIpv6) {
             return EIP_VERSION_6;
         }
- 
+
         ythrow yexception() << "Neither of IPv4/IPv6 is allowed.";
     }
 
-} 
+}

+ 7 - 7
library/cpp/messagebus/coreconn.h

@@ -1,8 +1,8 @@
 #pragma once
 
-////////////////////////////////////////////////////////////// 
-/// \file 
-/// \brief Definitions for asynchonous connection queue 
+//////////////////////////////////////////////////////////////
+/// \file
+/// \brief Definitions for asynchonous connection queue
 
 #include "base.h"
 #include "event_loop.h"
@@ -21,7 +21,7 @@
 #include <util/string/util.h>
 #include <util/system/condvar.h>
 #include <util/system/mutex.h>
-#include <util/system/thread.h> 
+#include <util/system/thread.h>
 #include <util/thread/lfqueue.h>
 
 #include <deque>
@@ -38,12 +38,12 @@ namespace NBus {
     class TBusConnection;
     class TBusConnectionFactory;
     class TBusServerFactory;
- 
+
     using TBusConnectionList = TList<TBusConnection*>;
- 
+
     /// @throw yexception
     EIpVersion MakeIpVersion(bool allowIpv4, bool allowIpv6);
- 
+
     inline bool WouldBlock() {
         int syserr = LastSystemError();
         return syserr == EAGAIN || syserr == EINPROGRESS || syserr == EWOULDBLOCK || syserr == EINTR;

+ 36 - 36
library/cpp/messagebus/locator.cpp

@@ -1,9 +1,9 @@
-//////////////////////////////////////////////////////////////////////////// 
+////////////////////////////////////////////////////////////////////////////
 /// \file
-/// \brief Implementation of locator service 
- 
+/// \brief Implementation of locator service
+
 #include "locator.h"
- 
+
 #include "ybus.h"
 
 #include <util/generic/hash_set.h>
@@ -11,7 +11,7 @@
 
 namespace NBus {
     using namespace NAddr;
- 
+
     static TIpPort GetAddrPort(const IRemoteAddr& addr) {
         switch (addr.Addr()->sa_family) {
             case AF_INET: {
@@ -84,8 +84,8 @@ namespace NBus {
 
     static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
         return (const sockaddr_in6*)a.Addr();
-    } 
- 
+    }
+
     static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
         if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
             if (a1.Addr()->sa_family == AF_INET) {
@@ -95,13 +95,13 @@ namespace NBus {
             }
         }
         return false;
-    } 
+    }
 
     TBusLocator::TBusLocator()
         : MyInterfaces(GetNetworkInterfaces())
     {
     }
- 
+
     bool TBusLocator::TItem::operator<(const TItem& y) const {
         const TItem& x = *this;
 
@@ -114,7 +114,7 @@ namespace NBus {
     bool TBusLocator::TItem::operator==(const TItem& y) const {
         return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
     }
- 
+
     TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
         : ServiceId(serviceId)
         , Start(start)
@@ -122,7 +122,7 @@ namespace NBus {
         , Addr(addr)
     {
     }
- 
+
     bool TBusLocator::IsLocal(const TNetAddr& addr) {
         for (const auto& myInterface : MyInterfaces) {
             if (IsAddressEqual(addr, *myInterface.Address)) {
@@ -132,7 +132,7 @@ namespace NBus {
 
         return false;
     }
- 
+
     TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
         const char* c = ServiceIdSet.insert(name).first->c_str();
         return (ui64)c;
@@ -140,7 +140,7 @@ namespace NBus {
 
     int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
         TGuard<TMutex> G(Lock);
- 
+
         TServiceId serviceId = GetServiceId(service);
         for (size_t i = 0; i < starts.size(); ++i) {
             RegisterBreak(serviceId, starts[i], addr);
@@ -152,7 +152,7 @@ namespace NBus {
         TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
         TItems::const_iterator service_it =
             Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr()));
- 
+
         THolder<TItem> left;
         THolder<TItem> right;
         if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
@@ -174,11 +174,11 @@ namespace NBus {
         Items.insert(*right);
         NormalizeBreaks(serviceId);
         return 0;
-    } 
- 
+    }
+
     int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
         TGuard<TMutex> G(Lock);
- 
+
         TServiceId serviceId = GetServiceId(service);
         return UnregisterBreak(serviceId, addr);
     }
@@ -225,7 +225,7 @@ namespace NBus {
             TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
             Items.insert(item);
         }
- 
+
         NormalizeBreaks(serviceId);
         return deleted;
     }
@@ -238,7 +238,7 @@ namespace NBus {
             if (serviceId != Max<TServiceId>()) {
                 last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
             }
- 
+
             --last;
             Y_ASSERT(Items.end() != last);
             Y_ASSERT(last->ServiceId == serviceId);
@@ -251,10 +251,10 @@ namespace NBus {
     int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
         TGuard<TMutex> G(Lock);
         Y_VERIFY(addrs.empty(), "Non emtpy addresses");
- 
+
         TServiceId serviceId = GetServiceId(service);
         TItems::const_iterator it;
- 
+
         for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
              it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
              ++it) {
@@ -273,27 +273,27 @@ namespace NBus {
 
         TServiceId serviceId = GetServiceId(service);
         TItems::const_iterator it;
- 
+
         it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
- 
+
         if (it != Items.end()) {
             const TItem& item = *it;
             if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
                 *addr = item.Addr;
- 
+
                 return 0;
             }
         }
 
         return -1;
-    } 
- 
+    }
+
     int TBusLocator::GetLocalPort(TBusService service) {
         TGuard<TMutex> G(Lock);
         TServiceId serviceId = GetServiceId(service);
         TItems::const_iterator it;
         int port = 0;
- 
+
         for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
             const TItem& item = *it;
             if (item.ServiceId != serviceId) {
@@ -357,8 +357,8 @@ namespace NBus {
             *isLocal = IsLocal(addr);
         }
         return 0;
-    } 
- 
+    }
+
     int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
         TGuard<TMutex> G(Lock);
         Y_VERIFY(keys.empty(), "Non empty keys");
@@ -376,8 +376,8 @@ namespace NBus {
             keys.push_back(std::make_pair(item.Start, item.End));
         }
         return (int)keys.size();
-    } 
- 
+    }
+
     int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
         TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
         {
@@ -387,13 +387,13 @@ namespace NBus {
         Register(service, start, end, addr);
         return 0;
     }
- 
+
     int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
         TNetAddr addr(na, requireVersion, preferVersion); // throws
         Register(service, start, end, addr);
         return 0;
-    } 
- 
+    }
+
     int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
         TGuard<TMutex> G(Lock);
 
@@ -412,7 +412,7 @@ namespace NBus {
                 Y_FAIL("Overlap in registered keys with non-identical range");
             }
         }
- 
+
         Items.insert(itemToReg);
         return 0;
     }
@@ -424,4 +424,4 @@ namespace NBus {
         return 0;
     }
 
-} 
+}

+ 10 - 10
library/cpp/messagebus/message.cpp

@@ -3,15 +3,15 @@
 
 #include <util/random/random.h>
 #include <util/string/printf.h>
-#include <util/system/atomic.h> 
- 
+#include <util/system/atomic.h>
+
 #include <string.h>
- 
+
 using namespace NBus;
 
 namespace NBus {
     using namespace NBus::NPrivate;
- 
+
     TBusIdentity::TBusIdentity()
         : MessageId(0)
         , Size(0)
@@ -73,7 +73,7 @@ namespace NBus {
         GetHeader()->Type = type;
         DoReset();
     }
- 
+
     TBusMessage::TBusMessage(ECreateUninitialized)
         //: TCtr("BusMessage")
         : TRefCounted<TBusMessage, TAtomicCounter, TDelete>(1)
@@ -81,11 +81,11 @@ namespace NBus {
         , Data(nullptr)
     {
     }
- 
+
     TString TBusMessage::Describe() const {
         return Sprintf("object type: %s, message type: %d", TypeName(*this).data(), int(GetHeader()->Type));
     }
- 
+
     TBusMessage::~TBusMessage() {
 #ifndef NDEBUG
         Y_VERIFY(GetHeader()->Id != YBUS_KEYINVALID, "must not be invalid key, message type: %d, ", int(Type));
@@ -94,7 +94,7 @@ namespace NBus {
         CheckClean();
 #endif
     }
- 
+
     void TBusMessage::DoReset() {
         GetHeader()->SendTime = 0;
         GetHeader()->Size = 0;
@@ -125,7 +125,7 @@ namespace NBus {
         memcpy(this, data.data(), sizeof(TBusHeader));
         return sizeof(TBusHeader);
     }
- 
+
     ///////////////////////////////////////////////////////
     /// \brief Packs header to network order
 
@@ -140,7 +140,7 @@ namespace NBus {
         data.Flags = GetHeader()->FlagsInternal;
         //data.LocalFlags = LocalFlags;
     }
- 
+
     ////////////////////////////////////////////////////////////
     /// \brief set message identity from serialized form
 

+ 15 - 15
library/cpp/messagebus/messqueue.cpp

@@ -3,13 +3,13 @@
 #include "remote_client_session.h"
 #include "remote_server_session.h"
 #include "ybus.h"
- 
+
 #include <util/generic/singleton.h>
 
 using namespace NBus;
 using namespace NBus::NPrivate;
 using namespace NActor;
- 
+
 TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TExecutorPtr executor, TBusLocator* locator, const char* name) {
     return new TBusMessageQueue(config, executor, locator, name);
 }
@@ -20,8 +20,8 @@ TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, TBus
     executorConfig.Name = name;
     TExecutorPtr executor = new TExecutor(executorConfig);
     return CreateMessageQueue(config, executor, locator, name);
-} 
- 
+}
+
 TBusMessageQueuePtr NBus::CreateMessageQueue(const TBusQueueConfig& config, const char* name) {
     return CreateMessageQueue(config, new TBusLocator, name);
 }
@@ -50,15 +50,15 @@ TBusMessageQueue::TBusMessageQueue(const TBusQueueConfig& config, TExecutorPtr e
     , Locator(locator)
     , WorkQueue(executor)
     , Running(1)
-{ 
+{
     InitBusLwtrace();
     InitNetworkSubSystem();
-} 
- 
-TBusMessageQueue::~TBusMessageQueue() { 
+}
+
+TBusMessageQueue::~TBusMessageQueue() {
     Stop();
-} 
- 
+}
+
 void TBusMessageQueue::Stop() {
     if (!AtomicCas(&Running, 0, 1)) {
         ShutdownComplete.WaitI();
@@ -128,8 +128,8 @@ TBusClientSessionPtr TBusMessageQueue::CreateSource(TBusProtocol* proto, IBusCli
     TRemoteClientSessionPtr session(new TRemoteClientSession(this, proto, handler, config, name));
     Add(session.Get());
     return session.Get();
-} 
- 
+}
+
 TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IBusServerHandler* handler, const TBusClientSessionConfig& config, const TString& name) {
     TRemoteServerSessionPtr session(new TRemoteServerSession(this, proto, handler, config, name));
     try {
@@ -140,7 +140,7 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
         if (port == 0) {
             port = proto->GetPort();
         }
- 
+
         session->Listen(port, this);
 
         Add(session.Get());
@@ -164,8 +164,8 @@ TBusServerSessionPtr TBusMessageQueue::CreateDestination(TBusProtocol* proto, IB
 void TBusMessageQueue::Add(TIntrusivePtr<TBusSessionImpl> session) {
     TGuard<TMutex> scope(Lock);
     Sessions.push_back(session);
-} 
- 
+}
+
 void TBusMessageQueue::Remove(TBusSession* session) {
     TGuard<TMutex> scope(Lock);
     TList<TIntrusivePtr<TBusSessionImpl>>::iterator it = std::find(Sessions.begin(), Sessions.end(), session);

+ 28 - 28
library/cpp/messagebus/oldmodule/module.cpp

@@ -1,5 +1,5 @@
 #include "module.h"
- 
+
 #include <library/cpp/messagebus/scheduler_actor.h>
 #include <library/cpp/messagebus/thread_extra.h>
 #include <library/cpp/messagebus/actor/actor.h>
@@ -64,7 +64,7 @@ namespace NBus {
     namespace NPrivate {
         class TJobStorage {
         };
- 
+
         struct TModuleClientHandler
            : public IBusClientHandler {
             TModuleClientHandler(TBusModuleImpl* module)
@@ -334,16 +334,16 @@ namespace NBus {
 
         ClearAllMessageStates();
     }
- 
+
     TNetAddr TBusJob::GetPeerAddrNetAddr() const {
         Y_VERIFY(!!OnMessageContext);
         return OnMessageContext.GetPeerAddrNetAddr();
     }
- 
+
     void TBusJob::CheckThreadCurrentJob() {
         Y_ASSERT(ThreadCurrentJob == this);
     }
- 
+
     /////////////////////////////////////////////////////////
     /// \brief Send messages in pending list
 
@@ -405,17 +405,17 @@ namespace NBus {
         }
         return Pending.size() > 0;
     }
- 
+
     bool TBusJob::AnyPendingToSend() {
         for (unsigned i = 0; i < Pending.size(); ++i) {
             if (Pending[i].Status == MESSAGE_DONT_ASK) {
                 return true;
             }
-        } 
+        }
 
         return false;
-    } 
- 
+    }
+
     bool TBusJob::IsDone() {
         bool r = (SleepUntil == 0 && Pending.size() == 0 && (Handler == nullptr || Status != MESSAGE_OK));
         return r;
@@ -427,7 +427,7 @@ namespace NBus {
 
         Handler = Handler(ModuleImpl->Module, this, Message);
     }
- 
+
     bool TBusJob::CallJobHandler() {
         /// go on as far as we can go without waiting
         while (!IsDone()) {
@@ -465,9 +465,9 @@ namespace NBus {
 
             TThreadCurrentJobGuard threadCurrentJobGuard(this);
             (Module->*(call.Handler))(this, call.Status, call.Message, call.Reply);
-        } 
+        }
     }
- 
+
     int TBusJob::CallReplyHandler(EMessageStatus status, TBusMessage* mess, TBusMessage* reply) {
         /// find handler for given message and update it's status
         size_t i = 0;
@@ -476,34 +476,34 @@ namespace NBus {
             if (call.Message == mess) {
                 break;
             }
-        } 
+        }
 
         /// if not found, report error
         if (i == Pending.size()) {
             Y_FAIL("must not happen");
         }
- 
+
         /// fill in response into job state
         TJobState& call = Pending[i];
         call.Status = status;
         Y_ASSERT(call.Message == mess);
         call.Reply = reply;
- 
+
         if ((status == MESSAGE_TIMEOUT || status == MESSAGE_DELIVERY_FAILED) && call.NumRetries < call.MaxRetries) {
             call.NumRetries++;
             call.Status = MESSAGE_DONT_ASK;
             call.Message->Reset(); // generate new Id
             DoCallReplyHandler(call);
             return 0;
-        } 
+        }
 
         /// call the handler if provided
         DoCallReplyHandler(call);
- 
+
         /// move job state into the finished stack
         Finished.push_back(Pending[i]);
         Pending.erase(Pending.begin() + i);
- 
+
         return 0;
     }
 
@@ -511,21 +511,21 @@ namespace NBus {
     /// send message to any other session or application
     void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries) {
         CheckThreadCurrentJob();
- 
+
         SetJob(mess.Get(), Runner);
         Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, nullptr, false));
     }
 
     void TBusJob::Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr) {
         CheckThreadCurrentJob();
- 
+
         SetJob(mess.Get(), Runner);
         Pending.push_back(TJobState(rhandler, MESSAGE_DONT_ASK, mess.Release(), session, nullptr, maxRetries, &addr, false));
     }
 
     void TBusJob::SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr) {
         CheckThreadCurrentJob();
- 
+
         SetJob(req.Get(), Runner);
         Pending.push_back(TJobState(nullptr, MESSAGE_DONT_ASK, req.Release(), session, nullptr, 0, &addr, true));
     }
@@ -559,7 +559,7 @@ namespace NBus {
 
         Status = status;
     }
- 
+
     void TBusJob::ClearState(TJobState& call) {
         TJobStateVec::iterator it;
         for (it = Finished.begin(); it != Finished.end(); ++it) {
@@ -569,10 +569,10 @@ namespace NBus {
                 Finished.erase(it);
                 return;
             }
-        } 
+        }
         Y_ASSERT(0);
-    } 
- 
+    }
+
     void TBusJob::ClearAllMessageStates() {
         ClearJobStateVector(&Finished);
         ClearJobStateVector(&Pending);
@@ -586,7 +586,7 @@ namespace NBus {
 
         SleepUntil = Now() + milliSeconds;
     }
- 
+
     TString TBusJob::GetStatus(unsigned flags) {
         TString strReturn;
         strReturn += Sprintf("  job=%016" PRIx64 " type=%d sent=%d pending=%d (%d) %s\n",
@@ -624,8 +624,8 @@ namespace NBus {
         if (job) {
             job->Cancel(status);
         }
-    } 
- 
+    }
+
     TString TBusModuleImpl::GetStatus(unsigned flags) {
         Y_UNUSED(flags);
         TWhatThreadDoesAcquireGuard<TMutex> G(Lock, "modules: acquiring lock for GetStatus");

+ 51 - 51
library/cpp/messagebus/oldmodule/module.h

@@ -1,55 +1,55 @@
 #pragma once
 
-/////////////////////////////////////////////////////////////////////////// 
-/// \file 
-/// \brief Application interface for modules 
+///////////////////////////////////////////////////////////////////////////
+/// \file
+/// \brief Application interface for modules
 
-/// NBus::TBusModule provides foundation for implementation of asynchnous 
-/// modules that communicate with multiple external or local sessions 
+/// NBus::TBusModule provides foundation for implementation of asynchnous
+/// modules that communicate with multiple external or local sessions
 /// NBus::TBusSession.
 
 /// To implement the module some virtual functions needs to be overridden:
 
 /// NBus::TBusModule::CreateExtSession() creates and registers an
-/// external session that receives incoming messages as input for module 
+/// external session that receives incoming messages as input for module
 /// processing.
 
 /// When new incoming message arrives the new NBus::TBusJob is created.
-/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state 
+/// NBus::TBusJob is somewhat similar to a thread, it maintains all the state
 /// during processing of one incoming message. Default implementation of
 /// NBus::TBusJob will maintain all send and received messages during
 /// lifetime of this job. Each message, status and reply can be found
-/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module 
-/// needs to maintain an additional information during lifetime of the job 
+/// within NBus::TJobState using NBus::TBusJob::GetState(). If your module
+/// needs to maintain an additional information during lifetime of the job
 /// you can derive your own class from NBus::TBusJob and override job
 /// factory method NBus::IJobFactory::CreateJobInstance() to create your instances.
 
 /// Processing of a given message starts with a call to NBus::TBusModule::Start()
 /// handler that should be overridden in the module implementation. Within
-/// the callback handler module can perform any computation and access any 
-/// datastore tables that it needs. The handler can also access any module 
+/// the callback handler module can perform any computation and access any
+/// datastore tables that it needs. The handler can also access any module
 /// variables. However, same handler can be called from multiple threads so,
 /// it is recommended that handler only access read-only module level variables.
 
 /// Handler should use NBus::TBusJob::Send() to send messages to other client
-/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main 
+/// sessions and it can use NBus::TBusJob::Reply() to send reply to the main
 /// job message. When handler is done, it returns the pointer to the next handler to call
-/// when all pending messages have cleared. If handler 
-/// returns pointer to itself the module will reschedule execution of this handler 
-/// for a later time. This should be done in case NBus::TBusJob::Send() returns 
-/// error (not MESSAGE_OK) 
- 
+/// when all pending messages have cleared. If handler
+/// returns pointer to itself the module will reschedule execution of this handler
+/// for a later time. This should be done in case NBus::TBusJob::Send() returns
+/// error (not MESSAGE_OK)
+
 #include "startsession.h"
 
 #include <library/cpp/messagebus/ybus.h>
- 
+
 #include <util/generic/noncopyable.h>
 #include <util/generic/object_counter.h>
 
-namespace NBus { 
+namespace NBus {
     class TBusJob;
     class TBusModule;
- 
+
     namespace NPrivate {
         struct TCallJobHandlerWorkItem;
         struct TBusModuleImpl;
@@ -57,7 +57,7 @@ namespace NBus {
         struct TModuleClientHandler;
         struct TJobRunner;
     }
- 
+
     class TJobHandler {
     protected:
         typedef TJobHandler (TBusModule::*TBusHandlerPtr)(TBusJob* job, TBusMessage* mess);
@@ -86,12 +86,12 @@ namespace NBus {
             return (b->*MyPtr)(job, mess);
         }
     };
- 
+
     typedef void (TBusModule::*TReplyHandler)(TBusJob* job, EMessageStatus status, TBusMessage* mess, TBusMessage* reply);
- 
+
     ////////////////////////////////////////////////////
     /// \brief Pending message state
- 
+
     struct TJobState {
         friend class TBusJob;
         friend class ::TCrawlerModule;
@@ -131,9 +131,9 @@ namespace NBus {
     public:
         TString GetStatus(unsigned flags);
     };
- 
+
     using TJobStateVec = TVector<TJobState>;
- 
+
     /////////////////////////////////////////////////////////
     /// \brief Execution item = thread
 
@@ -147,10 +147,10 @@ namespace NBus {
     public:
         /// given a module and starter message
         TBusJob(TBusModule* module, TBusMessage* message);
- 
+
         /// destructor will free all the message that were send and received
         virtual ~TBusJob();
- 
+
         TBusMessage* GetMessage() const {
             return Message;
         }
@@ -161,13 +161,13 @@ namespace NBus {
         /// If addr is set then use it as destination.
         void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler, size_t maxRetries, const TNetAddr& addr);
         void Send(TBusMessageAutoPtr mess, TBusClientSession* session, TReplyHandler rhandler = nullptr, size_t maxRetries = 0);
- 
+
         void SendOneWayTo(TBusMessageAutoPtr req, TBusClientSession* session, const TNetAddr& addr);
         void SendOneWayWithLocator(TBusMessageAutoPtr req, TBusClientSession* session);
 
         /// send reply to the starter message
         virtual void SendReply(TBusMessageAutoPtr reply);
- 
+
         /// set the flag to terminate job at the earliest convenience
         void Cancel(EMessageStatus status);
 
@@ -179,7 +179,7 @@ namespace NBus {
         void PutState(const TJobState& state) {
             Finished.push_back(state);
         }
- 
+
     public:
         /// retrieve all pending messages
         void GetPending(TJobStateVec* stateVec) {
@@ -225,10 +225,10 @@ namespace NBus {
                     }
                     return static_cast<MessageType*>(call.Message);
                 }
-            } 
+            }
             return nullptr;
-        } 
- 
+        }
+
         /// helper function to find status for previously sent message
         template <class MessageType>
         EMessageStatus GetStatus(int* startFrom = nullptr) {
@@ -240,10 +240,10 @@ namespace NBus {
                     }
                     return call.Status;
                 }
-            } 
+            }
             return MESSAGE_UNKNOWN;
-        } 
- 
+        }
+
         /// helper function to clear state of previosly sent messages
         template <class MessageType>
         void Clear() {
@@ -256,24 +256,24 @@ namespace NBus {
                 } else {
                     ++i;
                 }
-            } 
-        } 
- 
+            }
+        }
+
         /// helper function to clear state in order to try again
         void ClearState(TJobState& state);
- 
+
         /// clears all message states
         void ClearAllMessageStates();
 
         /// returns true if job is done
         bool IsDone();
- 
+
         /// return human reabable status of this job
         virtual TString GetStatus(unsigned flags);
 
         /// set sleep time for job
         void Sleep(int milliSeconds);
- 
+
         void CallJobHandlerOnly();
 
     private:
@@ -297,7 +297,7 @@ namespace NBus {
         TOnMessageContext OnMessageContext; // starter
     public:
         bool ReplySent;
- 
+
     private:
         friend class TBusModule;
         friend struct NPrivate::TBusModuleImpl;
@@ -312,7 +312,7 @@ namespace NBus {
         NPrivate::TBusModuleImpl* ModuleImpl; ///< module which created the job
         TBusInstant SleepUntil;               ///< time to wakeup, 0 if no sleep
     };
- 
+
     ////////////////////////////////////////////////////////////////////
     /// \brief Classes to implement basic module functionality
 
@@ -358,18 +358,18 @@ namespace NBus {
         friend class TBusJob;
 
         TObjectCounter<TBusModule> ObjectCounter;
- 
+
         TIntrusivePtr<NPrivate::TBusModuleImpl> Impl;
- 
+
     public:
         /// Each module should have a name which is used as protocol service
         TBusModule(const char* name);
         ~TBusModule() override;
 
         const char* GetName() const;
- 
+
         void SetConfig(const TBusModuleConfig& config);
- 
+
         /// get status of all jobs in flight
         TString GetStatus(unsigned flags = 0);
 
@@ -380,7 +380,7 @@ namespace NBus {
 
         // this default implementation just creates TBusJob object
         TBusJob* CreateJobInstance(TBusMessage* message) override;
- 
+
         EMessageStatus StartJob(TAutoPtr<TBusMessage> message);
 
         /// creates private sessions, calls CreateExtSession(), should be called before StartInput()
@@ -391,7 +391,7 @@ namespace NBus {
     public:
         /// entry point into module, first function to call
         virtual TJobHandler Start(TBusJob* job, TBusMessage* mess) = 0;
- 
+
     protected:
         /// override this function to create destination session
         virtual TBusServerSessionPtr CreateExtSession(TBusMessageQueue& queue) = 0;

+ 20 - 20
library/cpp/messagebus/oldmodule/startsession.cpp

@@ -1,19 +1,19 @@
-/////////////////////////////////////////////////////////// 
+///////////////////////////////////////////////////////////
 /// \file
-/// \brief Starter session implementation 
+/// \brief Starter session implementation
 
-/// Starter session will generate emtpy message to insert 
-/// into local session that are registered under same protocol 
+/// Starter session will generate emtpy message to insert
+/// into local session that are registered under same protocol
 
-/// Starter (will one day) automatically adjust number 
-/// of message inflight to make sure that at least one of source 
-/// sessions within message queue is at the limit (bottle neck) 
+/// Starter (will one day) automatically adjust number
+/// of message inflight to make sure that at least one of source
+/// sessions within message queue is at the limit (bottle neck)
+
+/// Maximum number of messages that starter will instert into
+/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
+
+#include "startsession.h"
 
-/// Maximum number of messages that starter will instert into 
-/// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight 
- 
-#include "startsession.h" 
- 
 #include "module.h"
 
 #include <library/cpp/messagebus/ybus.h>
@@ -24,7 +24,7 @@ namespace NBus {
         pThis->Starter();
         return nullptr;
     }
- 
+
     TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
         : Module(module)
         , Config(config)
@@ -33,11 +33,11 @@ namespace NBus {
     {
         StartThread.Start();
     }
- 
+
     TBusStarter::~TBusStarter() {
         Shutdown();
     }
- 
+
     void TBusStarter::Shutdown() {
         {
             TGuard<TMutex> g(ExitLock);
@@ -46,20 +46,20 @@ namespace NBus {
         }
         StartThread.Join();
     }
- 
+
     void TBusStarter::Starter() {
         TGuard<TMutex> g(ExitLock);
         while (!Exiting) {
             TAutoPtr<TBusMessage> empty(new TBusMessage(0));
- 
+
             EMessageStatus status = Module->StartJob(empty);
- 
+
             if (Config.SendTimeout > 0) {
                 ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
             } else {
                 ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
             }
-        } 
-    } 
+        }
+    }
 
 }

+ 7 - 7
library/cpp/messagebus/oldmodule/startsession.h

@@ -1,12 +1,12 @@
 #pragma once
- 
+
 #include <library/cpp/messagebus/ybus.h>
 
-#include <util/system/thread.h> 
- 
-namespace NBus { 
+#include <util/system/thread.h>
+
+namespace NBus {
     class TBusModule;
- 
+
     class TBusStarter {
     private:
         TBusModule* Module;
@@ -23,12 +23,12 @@ namespace NBus {
         TString GetStatus(ui16 /*flags=YBUS_STATUS_CONNS*/) {
             return "";
         }
- 
+
     public:
         TBusStarter(TBusModule* module, const TBusSessionConfig& config);
         ~TBusStarter();
 
         void Shutdown();
     };
- 
+
 }

+ 16 - 16
library/cpp/messagebus/protobuf/ybusbuf.h

@@ -1,24 +1,24 @@
 #pragma once
- 
+
 #include <library/cpp/messagebus/ybus.h>
- 
+
 #include <google/protobuf/descriptor.h>
 #include <google/protobuf/message.h>
- 
+
 #include <util/generic/cast.h>
 #include <util/generic/vector.h>
 #include <util/stream/mem.h>
- 
+
 #include <array>
 
-namespace NBus { 
+namespace NBus {
     using TBusBufferRecord = ::google::protobuf::Message;
- 
+
     template <class TBufferMessage>
     class TBusBufferMessagePtr;
     template <class TBufferMessage>
     class TBusBufferMessageAutoPtr;
- 
+
     class TBusBufferBase: public TBusMessage {
     public:
         TBusBufferBase(int type)
@@ -43,7 +43,7 @@ namespace NBus {
 
     /// @param TBufferRecord is record described in .proto file with namespace
     /// @param MessageFile is offset for .proto file message ids
- 
+
     /// \attention If you want one protocol NBus::TBusBufferProtocol to handle
     /// messageges described in different .proto files, make sure that they have
     /// unique values for MessageFile
@@ -52,7 +52,7 @@ namespace NBus {
     class TBusBufferMessage: public TBusBufferBase {
     public:
         static const int MessageType = MType;
- 
+
         typedef TBusBufferMessagePtr<TBusBufferMessage<TBufferRecord, MType>> TPtr;
         typedef TBusBufferMessageAutoPtr<TBusBufferMessage<TBufferRecord, MType>> TAutoPtr;
 
@@ -88,7 +88,7 @@ namespace NBus {
             return new TBusBufferMessage<TBufferRecord, MessageType>();
         }
     };
- 
+
     template <class TSelf, class TBufferMessage>
     class TBusBufferMessagePtrBase {
     public:
@@ -101,7 +101,7 @@ namespace NBus {
         const TSelf* GetSelf() const {
             return static_cast<const TSelf*>(this);
         }
- 
+
     public:
         RecordType* operator->() {
             Y_ASSERT(GetSelf()->Get());
@@ -119,7 +119,7 @@ namespace NBus {
             Y_ASSERT(GetSelf()->Get());
             return GetSelf()->Get()->Record;
         }
- 
+
         TBusHeader* GetHeader() {
             return GetSelf()->Get()->GetHeader();
         }
@@ -127,7 +127,7 @@ namespace NBus {
             return GetSelf()->Get()->GetHeader();
         }
     };
- 
+
     template <class TBufferMessage>
     class TBusBufferMessagePtr: public TBusBufferMessagePtrBase<TBusBufferMessagePtr<TBufferMessage>, TBufferMessage> {
     protected:
@@ -179,7 +179,7 @@ namespace NBus {
             : AutoPtr(message)
         {
         }
- 
+
         TBufferMessage* Get() {
             return AutoPtr.Get();
         }
@@ -198,7 +198,7 @@ namespace NBus {
             return AutoPtr.Release();
         }
     };
- 
+
     /////////////////////////////////////////////
     /// \brief Generic protocol object for messages descibed with protobuf
 
@@ -223,7 +223,7 @@ namespace NBus {
         void RegisterType(TAutoPtr<TBusBufferBase> mess);
 
         TArrayRef<TBusBufferBase* const> GetTypes() const;
- 
+
         /// serialized protocol specific data into TBusData
         void Serialize(const TBusMessage* mess, TBuffer& data) override;
 

Some files were not shown because too many files changed in this diff