session.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. #include "ybus.h"
  2. #include <util/generic/cast.h>
  3. using namespace NBus;
  4. namespace NBus {
  5. TBusSession::TBusSession() {
  6. }
  7. ////////////////////////////////////////////////////////////////////
  8. /// \brief Adds peer of connection into connection list
  9. int CompareByHost(const IRemoteAddr& l, const IRemoteAddr& r) noexcept {
  10. if (l.Addr()->sa_family != r.Addr()->sa_family) {
  11. return l.Addr()->sa_family < r.Addr()->sa_family ? -1 : +1;
  12. }
  13. switch (l.Addr()->sa_family) {
  14. case AF_INET: {
  15. return memcmp(&(((const sockaddr_in*)l.Addr())->sin_addr), &(((const sockaddr_in*)r.Addr())->sin_addr), sizeof(in_addr));
  16. }
  17. case AF_INET6: {
  18. return memcmp(&(((const sockaddr_in6*)l.Addr())->sin6_addr), &(((const sockaddr_in6*)r.Addr())->sin6_addr), sizeof(in6_addr));
  19. }
  20. }
  21. return memcmp(l.Addr(), r.Addr(), Min<size_t>(l.Len(), r.Len()));
  22. }
  23. bool operator<(const TNetAddr& a1, const TNetAddr& a2) {
  24. return CompareByHost(a1, a2) < 0;
  25. }
  26. size_t TBusSession::GetInFlight(const TNetAddr& addr) const {
  27. size_t r;
  28. GetInFlightBulk({addr}, MakeArrayRef(&r, 1));
  29. return r;
  30. }
  31. size_t TBusSession::GetConnectSyscallsNumForTest(const TNetAddr& addr) const {
  32. size_t r;
  33. GetConnectSyscallsNumBulkForTest({addr}, MakeArrayRef(&r, 1));
  34. return r;
  35. }
  36. // Split 'host' into name and port taking into account that host can be specified
  37. // as ipv6 address ('[<ipv6 address]:port' notion).
  38. bool SplitHost(const TString& host, TString* hostName, TString* portNum) {
  39. hostName->clear();
  40. portNum->clear();
  41. // Simple check that we have to deal with ipv6 address specification or
  42. // just host name or ipv4 address.
  43. if (!host.empty() && (host[0] == '[')) {
  44. size_t pos = host.find(']');
  45. if (pos < 2 || pos == TString::npos) {
  46. // '[]' and '[<address>' are errors.
  47. return false;
  48. }
  49. *hostName = host.substr(1, pos - 1);
  50. pos++;
  51. if (pos != host.length()) {
  52. if (host[pos] != ':') {
  53. // Do not allow '[...]a' but '[...]:' is ok (as for ipv4 before
  54. return false;
  55. }
  56. *portNum = host.substr(pos + 1);
  57. }
  58. } else {
  59. size_t pos = host.find(':');
  60. if (pos != TString::npos) {
  61. if (pos == 0) {
  62. // Treat ':<port>' as errors but allow or '<host>:' for compatibility.
  63. return false;
  64. }
  65. *portNum = host.substr(pos + 1);
  66. }
  67. *hostName = host.substr(0, pos);
  68. }
  69. return true;
  70. }
  71. /// registers external session on host:port with locator service
  72. int TBusSession::RegisterService(const char* host, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion ipVersion) {
  73. TString hostName;
  74. TString port;
  75. int portNum;
  76. if (!SplitHost(host, &hostName, &port)) {
  77. hostName = host;
  78. }
  79. if (port.empty()) {
  80. portNum = GetProto()->GetPort();
  81. } else {
  82. try {
  83. portNum = FromString<int>(port);
  84. } catch (const TFromStringException&) {
  85. return -1;
  86. }
  87. }
  88. TBusService service = GetProto()->GetService();
  89. return GetQueue()->GetLocator()->Register(service, hostName.data(), portNum, start, end, ipVersion);
  90. }
  91. TBusSession::~TBusSession() {
  92. }
  93. }
  94. TBusClientSessionPtr TBusClientSession::Create(TBusProtocol* proto, IBusClientHandler* handler, const TBusClientSessionConfig& config, TBusMessageQueuePtr queue) {
  95. return queue->CreateSource(proto, handler, config);
  96. }
  97. TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue) {
  98. return queue->CreateDestination(proto, handler, config);
  99. }
  100. TBusServerSessionPtr TBusServerSession::Create(TBusProtocol* proto, IBusServerHandler* handler, const TBusServerSessionConfig& config, TBusMessageQueuePtr queue, const TVector<TBindResult>& bindTo) {
  101. return queue->CreateDestination(proto, handler, config, bindTo);
  102. }