123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- //
- // TCPServerDispatcher.cpp
- //
- // Library: Net
- // Package: TCPServer
- // Module: TCPServerDispatcher
- //
- // Copyright (c) 2005-2007, Applied Informatics Software Engineering GmbH.
- // and Contributors.
- //
- // SPDX-License-Identifier: BSL-1.0
- //
- #include "Poco/Net/TCPServerDispatcher.h"
- #include "Poco/Net/TCPServerConnectionFactory.h"
- #include "Poco/Notification.h"
- #include "Poco/AutoPtr.h"
- #include <memory>
- using Poco::Notification;
- using Poco::FastMutex;
- using Poco::AutoPtr;
- namespace Poco {
- namespace Net {
- class TCPConnectionNotification: public Notification
- {
- public:
- TCPConnectionNotification(const StreamSocket& socket):
- _socket(socket)
- {
- }
-
- ~TCPConnectionNotification()
- {
- }
-
- const StreamSocket& socket() const
- {
- return _socket;
- }
- private:
- StreamSocket _socket;
- };
- TCPServerDispatcher::TCPServerDispatcher(TCPServerConnectionFactory::Ptr pFactory, Poco::ThreadPool& threadPool, TCPServerParams::Ptr pParams):
- _rc(1),
- _pParams(pParams),
- _currentThreads(0),
- _totalConnections(0),
- _currentConnections(0),
- _maxConcurrentConnections(0),
- _refusedConnections(0),
- _stopped(false),
- _pConnectionFactory(pFactory),
- _threadPool(threadPool)
- {
- poco_check_ptr (pFactory);
- if (!_pParams)
- _pParams = new TCPServerParams;
-
- if (_pParams->getMaxThreads() == 0)
- _pParams->setMaxThreads(threadPool.capacity());
- }
- TCPServerDispatcher::~TCPServerDispatcher()
- {
- }
- void TCPServerDispatcher::duplicate()
- {
- _mutex.lock();
- ++_rc;
- _mutex.unlock();
- }
- void TCPServerDispatcher::release()
- {
- _mutex.lock();
- int rc = --_rc;
- _mutex.unlock();
- if (rc == 0) delete this;
- }
- void TCPServerDispatcher::run()
- {
- try
- {
- AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive
- int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
- for (;;)
- {
- AutoPtr<Notification> pNf = _queue.waitDequeueNotification(idleTime);
- if (pNf)
- {
- TCPConnectionNotification* pCNf = dynamic_cast<TCPConnectionNotification*>(pNf.get());
- if (pCNf)
- {
- #ifndef POCO_ENABLE_CPP11
- std::auto_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
- #else
- std::unique_ptr<TCPServerConnection> pConnection(_pConnectionFactory->createConnection(pCNf->socket()));
- #endif // POCO_ENABLE_CPP11
- poco_check_ptr(pConnection.get());
- beginConnection();
- pConnection->start();
- endConnection();
- }
- }
-
- FastMutex::ScopedLock lock(_mutex);
- if (_stopped || (_currentThreads > 1 && _queue.empty()))
- {
- --_currentThreads;
- break;
- }
- }
- }
- catch (...)
- {
- FastMutex::ScopedLock lock(_mutex);
- --_currentThreads;
- throw;
- }
- }
- namespace
- {
- static const std::string threadName("TCPServerConnection");
- }
-
- void TCPServerDispatcher::enqueue(const StreamSocket& socket)
- {
- FastMutex::ScopedLock lock(_mutex);
- if (_queue.size() < _pParams->getMaxQueued())
- {
- if (!_queue.hasIdleThreads() && _currentThreads < _pParams->getMaxThreads())
- {
- try
- {
- _threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
- ++_currentThreads;
- }
- catch (Poco::Exception&)
- {
- ++_refusedConnections;
- return;
- }
- }
- _queue.enqueueNotification(new TCPConnectionNotification(socket));
- }
- else
- {
- ++_refusedConnections;
- }
- }
- void TCPServerDispatcher::stop()
- {
- _stopped = true;
- _queue.clear();
- _queue.wakeUpAll();
- }
- int TCPServerDispatcher::currentThreads() const
- {
- FastMutex::ScopedLock lock(_mutex);
-
- return _currentThreads;
- }
- int TCPServerDispatcher::maxThreads() const
- {
- FastMutex::ScopedLock lock(_mutex);
-
- return _threadPool.capacity();
- }
- int TCPServerDispatcher::totalConnections() const
- {
- FastMutex::ScopedLock lock(_mutex);
-
- return _totalConnections;
- }
- int TCPServerDispatcher::currentConnections() const
- {
- FastMutex::ScopedLock lock(_mutex);
-
- return _currentConnections;
- }
- int TCPServerDispatcher::maxConcurrentConnections() const
- {
- FastMutex::ScopedLock lock(_mutex);
-
- return _maxConcurrentConnections;
- }
- int TCPServerDispatcher::queuedConnections() const
- {
- return _queue.size();
- }
- int TCPServerDispatcher::refusedConnections() const
- {
- FastMutex::ScopedLock lock(_mutex);
-
- return _refusedConnections;
- }
- void TCPServerDispatcher::beginConnection()
- {
- FastMutex::ScopedLock lock(_mutex);
-
- ++_totalConnections;
- ++_currentConnections;
- if (_currentConnections > _maxConcurrentConnections)
- _maxConcurrentConnections = _currentConnections;
- }
- void TCPServerDispatcher::endConnection()
- {
- FastMutex::ScopedLock lock(_mutex);
- --_currentConnections;
- }
- } } // namespace Poco::Net
|