startsession.cpp 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. ///////////////////////////////////////////////////////////
  2. /// \file
  3. /// \brief Starter session implementation
  4. /// Starter session will generate emtpy message to insert
  5. /// into local session that are registered under same protocol
  6. /// Starter (will one day) automatically adjust number
  7. /// of message inflight to make sure that at least one of source
  8. /// sessions within message queue is at the limit (bottle neck)
  9. /// Maximum number of messages that starter will instert into
  10. /// the pipeline is configured by NBus::TBusSessionConfig::MaxInFlight
  11. #include "startsession.h"
  12. #include "module.h"
  13. #include <library/cpp/messagebus/ybus.h>
  14. namespace NBus {
  15. void* TBusStarter::_starter(void* data) {
  16. TBusStarter* pThis = static_cast<TBusStarter*>(data);
  17. pThis->Starter();
  18. return nullptr;
  19. }
  20. TBusStarter::TBusStarter(TBusModule* module, const TBusSessionConfig& config)
  21. : Module(module)
  22. , Config(config)
  23. , StartThread(_starter, this)
  24. , Exiting(false)
  25. {
  26. StartThread.Start();
  27. }
  28. TBusStarter::~TBusStarter() {
  29. Shutdown();
  30. }
  31. void TBusStarter::Shutdown() {
  32. {
  33. TGuard<TMutex> g(ExitLock);
  34. Exiting = true;
  35. ExitSignal.Signal();
  36. }
  37. StartThread.Join();
  38. }
  39. void TBusStarter::Starter() {
  40. TGuard<TMutex> g(ExitLock);
  41. while (!Exiting) {
  42. TAutoPtr<TBusMessage> empty(new TBusMessage(0));
  43. EMessageStatus status = Module->StartJob(empty);
  44. if (Config.SendTimeout > 0) {
  45. ExitSignal.WaitT(ExitLock, TDuration::MilliSeconds(Config.SendTimeout));
  46. } else {
  47. ExitSignal.WaitT(ExitLock, (status == MESSAGE_BUSY) ? TDuration::MilliSeconds(1) : TDuration::Zero());
  48. }
  49. }
  50. }
  51. }