TCPServerDispatcher.cpp 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. //
  2. // TCPServerDispatcher.cpp
  3. //
  4. // Library: Net
  5. // Package: TCPServer
  6. // Module: TCPServerDispatcher
  7. //
  8. // Copyright (c) 2005-2007, Applied Informatics Software Engineering GmbH.
  9. // and Contributors.
  10. //
  11. // SPDX-License-Identifier: BSL-1.0
  12. //
  13. #include "Poco/Net/TCPServerDispatcher.h"
  14. #include "Poco/Net/TCPServerConnectionFactory.h"
  15. #include "Poco/Notification.h"
  16. #include "Poco/AutoPtr.h"
  17. #include <memory>
  18. using Poco::Notification;
  19. using Poco::FastMutex;
  20. using Poco::AutoPtr;
  21. namespace Poco {
  22. namespace Net {
  23. class TCPConnectionNotification: public Notification
  24. {
  25. public:
  26. TCPConnectionNotification(const StreamSocket& socket):
  27. _socket(socket)
  28. {
  29. }
  30. ~TCPConnectionNotification()
  31. {
  32. }
  33. const StreamSocket& socket() const
  34. {
  35. return _socket;
  36. }
  37. private:
  38. StreamSocket _socket;
  39. };
  40. TCPServerDispatcher::TCPServerDispatcher(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, TCPServerParams::Ptr pParams):
  41. _rc(1),
  42. _pParams(pParams),
  43. _currentThreads(0),
  44. _totalConnections(0),
  45. _currentConnections(0),
  46. _maxConcurrentConnections(0),
  47. _refusedConnections(0),
  48. _stopped(false),
  49. _pConnectionFactory(pFactory),
  50. _threadPool(threadPool)
  51. {
  52. poco_check_ptr (pFactory);
  53. if (!_pParams)
  54. _pParams = new TCPServerParams;
  55. if (_pParams->getMaxThreads() == 0)
  56. _pParams->setMaxThreads(threadPool.capacity());
  57. }
  58. TCPServerDispatcher::~TCPServerDispatcher()
  59. {
  60. }
  61. void TCPServerDispatcher::duplicate()
  62. {
  63. _mutex.lock();
  64. ++_rc;
  65. _mutex.unlock();
  66. }
  67. void TCPServerDispatcher::release()
  68. {
  69. _mutex.lock();
  70. int rc = --_rc;
  71. _mutex.unlock();
  72. if (rc == 0) delete this;
  73. }
  74. void TCPServerDispatcher::run()
  75. {
  76. try
  77. {
  78. AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive
  79. int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
  80. for (;;)
  81. {
  82. AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
  83. if (pNf)
  84. {
  85. TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
  86. if (pCNf)
  87. {
  88. #ifndef POCO_ENABLE_CPP11
  89. std::auto_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
  90. #else
  91. std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
  92. #endif // POCO_ENABLE_CPP11
  93. poco_check_ptr(pConnection.get());
  94. beginConnection();
  95. pConnection->start();
  96. endConnection();
  97. }
  98. }
  99. FastMutex::ScopedLock lock(_mutex);
  100. if (_stopped || (_currentThreads > 1 && _queue.empty()))
  101. {
  102. --_currentThreads;
  103. break;
  104. }
  105. }
  106. }
  107. catch (...)
  108. {
  109. FastMutex::ScopedLock lock(_mutex);
  110. --_currentThreads;
  111. throw;
  112. }
  113. }
  114. namespace
  115. {
  116. static const std::string threadName("TCPServerConnection");
  117. }
  118. void TCPServerDispatcher::enqueue(const StreamSocket& socket)
  119. {
  120. FastMutex::ScopedLock lock(_mutex);
  121. if (_queue.size() < _pParams->getMaxQueued())
  122. {
  123. if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
  124. {
  125. try
  126. {
  127. _threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
  128. ++_currentThreads;
  129. }
  130. catch (Poco::Exception&)
  131. {
  132. ++_refusedConnections;
  133. return;
  134. }
  135. }
  136. _queue.enqueueNotification(new TCPConnectionNotification(socket));
  137. }
  138. else
  139. {
  140. ++_refusedConnections;
  141. }
  142. }
  143. void TCPServerDispatcher::stop()
  144. {
  145. _stopped = true;
  146. _queue.clear();
  147. _queue.wakeUpAll();
  148. }
  149. int TCPServerDispatcher::currentThreads() const
  150. {
  151. FastMutex::ScopedLock lock(_mutex);
  152. return _currentThreads;
  153. }
  154. int TCPServerDispatcher::maxThreads() const
  155. {
  156. FastMutex::ScopedLock lock(_mutex);
  157. return _threadPool.capacity();
  158. }
  159. int TCPServerDispatcher::totalConnections() const
  160. {
  161. FastMutex::ScopedLock lock(_mutex);
  162. return _totalConnections;
  163. }
  164. int TCPServerDispatcher::currentConnections() const
  165. {
  166. FastMutex::ScopedLock lock(_mutex);
  167. return _currentConnections;
  168. }
  169. int TCPServerDispatcher::maxConcurrentConnections() const
  170. {
  171. FastMutex::ScopedLock lock(_mutex);
  172. return _maxConcurrentConnections;
  173. }
  174. int TCPServerDispatcher::queuedConnections() const
  175. {
  176. return _queue.size();
  177. }
  178. int TCPServerDispatcher::refusedConnections() const
  179. {
  180. FastMutex::ScopedLock lock(_mutex);
  181. return _refusedConnections;
  182. }
  183. void TCPServerDispatcher::beginConnection()
  184. {
  185. FastMutex::ScopedLock lock(_mutex);
  186. ++_totalConnections;
  187. ++_currentConnections;
  188. if (_currentConnections > _maxConcurrentConnections)
  189. _maxConcurrentConnections = _currentConnections;
  190. }
  191. void TCPServerDispatcher::endConnection()
  192. {
  193. FastMutex::ScopedLock lock(_mutex);
  194. --_currentConnections;
  195. }
  196. } } // namespace Poco::Net