#include "ybus.h" #include using namespace NBus; namespace NBus { TBusSession::TBusSession() { } //////////////////////////////////////////////////////////////////// /// \brief Adds peer of connection into connection list int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept { if (l.Addr()->sa_family != r.Addr()->sa_family) { return l.Addr()->sa_family < r.Addr()->sa_family ? -1 : +1; } switch (l.Addr()->sa_family) { case AF_INET: { return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr)); } case AF_INET6: { return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr)); } } return memcmp(l.Addr(), r.Addr(), Min(l.Len(), r.Len())); } bool operator<(const TNetAddr& a1, const TNetAddr& a2) { return CompareByHost(a1, a2) < 0; } size_t TBusSession::GetInFlight(const TNetAddr& addr) const { size_t r; GetInFlightBulk({addr}, MakeArrayRef(&r, 1)); return r; } size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const { size_t r; GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1)); return r; } // Split 'host' into name and port taking into account that host can be specified // as ipv6 address ('[clear(); portNum->clear(); // Simple check that we have to deal with ipv6 address specification or // just host name or ipv4 address. if (!host.empty() && (host[0] == '[')) { size_t pos = host.find(']'); if (pos < 2 || pos == TString::npos) { // '[]' and '[
' are errors. return false; } *hostName = host.substr(1, pos - 1); pos++; if (pos != host.length()) { if (host[pos] != ':') { // Do not allow '[...]a' but '[...]:' is ok (as for ipv4 before return false; } *portNum = host.substr(pos + 1); } } else { size_t pos = host.find(':'); if (pos != TString::npos) { if (pos == 0) { // Treat ':' as errors but allow or ':' for compatibility. return false; } *portNum = host.substr(pos + 1); } *hostName = host.substr(0, pos); } return true; } /// registers external session on host:port with locator service int TBusSession::RegisterService(const char* host, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion ipVersion) { TString hostName; TString port; int portNum; if (!SplitHost(host, &hostName, &port)) { hostName = host; } if (port.empty()) { portNum = GetProto()->GetPort(); } else { try { portNum = FromString(port); } catch (const TFromStringException&) { return -1; } } TBusService service = GetProto()->GetService(); return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion); } TBusSession::~TBusSession() { } } TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) { return queue->CreateSource(proto, handler, config); } TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) { return queue->CreateDestination(proto, handler, config); } TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector& bindTo) { return queue->CreateDestination(proto, handler, config, bindTo); }