/////////////////////////////////////////////////////////// /// \file /// \brief Starter session implementation /// Starter session will generate emtpy message to insert /// into local session that are registered under same protocol /// Starter (will one day) automatically adjust number /// of message inflight to make sure that at least one of source /// sessions within message queue is at the limit (bottle neck) /// Maximum number of messages that starter will instert into /// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight #include "startsession.h" #include "module.h" #include namespace NBus { void* TBusStarter::_starter(void* data) { TBusStarter* pThis = static_cast(data); pThis->Starter(); return nullptr; } TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config) : Module(module) , Config(config) , StartThread(_starter, this) , Exiting(false) { StartThread.Start(); } TBusStarter::~TBusStarter() { Shutdown(); } void TBusStarter::Shutdown() { { TGuard g(ExitLock); Exiting = true; ExitSignal.Signal(); } StartThread.Join(); } void TBusStarter::Starter() { TGuard g(ExitLock); while (!Exiting) { TAutoPtr empty(new TBusMessage(0)); EMessageStatus status = Module->StartJob(empty); if (Config.SendTimeout > 0) { ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout)); } else { ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero()); } } } }