123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- #include "ybus.h"
- #include <util/generic/cast.h>
- using namespace NBus;
- namespace NBus {
- TBusSession::TBusSession() {
- }
- ////////////////////////////////////////////////////////////////////
- /// \brief Adds peer of connection into connection list
- int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept {
- if (l.Addr()->sa_family != r.Addr()->sa_family) {
- return l.Addr()->sa_family < r.Addr()->sa_family ? -1 : +1;
- }
- switch (l.Addr()->sa_family) {
- case AF_INET: {
- return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr));
- }
- case AF_INET6: {
- return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr));
- }
- }
- return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len()));
- }
- bool operator<(const TNetAddr& a1, const TNetAddr& a2) {
- return CompareByHost(a1, a2) < 0;
- }
- size_t TBusSession::GetInFlight(const TNetAddr& addr) const {
- size_t r;
- GetInFlightBulk({addr}, MakeArrayRef(&r, 1));
- return r;
- }
- size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const {
- size_t r;
- GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1));
- return r;
- }
- // Split 'host' into name and port taking into account that host can be specified
- // as ipv6 address ('[<ipv6 address]:port' notion).
- bool SplitHost(const TString& host, TString* hostName, TString* portNum) {
- hostName->clear();
- portNum->clear();
- // Simple check that we have to deal with ipv6 address specification or
- // just host name or ipv4 address.
- if (!host.empty() && (host[0] == '[')) {
- size_t pos = host.find(']');
- if (pos < 2 || pos == TString::npos) {
- // '[]' and '[<address>' are errors.
- return false;
- }
- *hostName = host.substr(1, pos - 1);
- pos++;
- if (pos != host.length()) {
- if (host[pos] != ':') {
- // Do not allow '[...]a' but '[...]:' is ok (as for ipv4 before
- return false;
- }
- *portNum = host.substr(pos + 1);
- }
- } else {
- size_t pos = host.find(':');
- if (pos != TString::npos) {
- if (pos == 0) {
- // Treat ':<port>' as errors but allow or '<host>:' for compatibility.
- return false;
- }
- *portNum = host.substr(pos + 1);
- }
- *hostName = host.substr(0, pos);
- }
- return true;
- }
- /// registers external session on host:port with locator service
- int TBusSession::RegisterService(const char* host, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion ipVersion) {
- TString hostName;
- TString port;
- int portNum;
- if (!SplitHost(host, &hostName, &port)) {
- hostName = host;
- }
- if (port.empty()) {
- portNum = GetProto()->GetPort();
- } else {
- try {
- portNum = FromString<int>(port);
- } catch (const TFromStringException&) {
- return -1;
- }
- }
- TBusService service = GetProto()->GetService();
- return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion);
- }
- TBusSession::~TBusSession() {
- }
- }
- TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) {
- return queue->CreateSource(proto, handler, config);
- }
- TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) {
- return queue->CreateDestination(proto, handler, config);
- }
- TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) {
- return queue->CreateDestination(proto, handler, config, bindTo);
- }
|