locator.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. ////////////////////////////////////////////////////////////////////////////
  2. /// \file
  3. /// \brief Implementation of locator service
  4. #include "locator.h"
  5. #include "ybus.h"
  6. #include <util/generic/hash_set.h>
  7. #include <util/system/hostname.h>
  8. namespace NBus {
  9. using namespace NAddr;
  10. static TIpPort GetAddrPort(const IRemoteAddr& addr) {
  11. switch (addr.Addr()->sa_family) {
  12. case AF_INET: {
  13. return ntohs(((const sockaddr_in*)addr.Addr())->sin_port);
  14. }
  15. case AF_INET6: {
  16. return ntohs(((const sockaddr_in6*)addr.Addr())->sin6_port);
  17. }
  18. default: {
  19. ythrow yexception() << "not implemented";
  20. break;
  21. }
  22. }
  23. }
  24. static inline bool GetIp6AddressFromVector(const TVector<TNetAddr>& addrs, TNetAddr* addr) {
  25. for (size_t i = 1; i < addrs.size(); ++i) {
  26. if (addrs[i - 1].Addr()->sa_family == addrs[i].Addr()->sa_family) {
  27. return false;
  28. }
  29. if (GetAddrPort(addrs[i - 1]) != GetAddrPort(addrs[i])) {
  30. return false;
  31. }
  32. }
  33. for (size_t i = 0; i < addrs.size(); ++i) {
  34. if (addrs[i].Addr()->sa_family == AF_INET6) {
  35. *addr = addrs[i];
  36. return true;
  37. }
  38. }
  39. return false;
  40. }
  41. EMessageStatus TBusProtocol::GetDestination(const TBusClientSession*, TBusMessage* mess, TBusLocator* locator, TNetAddr* addr) {
  42. TBusService service = GetService();
  43. TBusKey key = GetKey(mess);
  44. TVector<TNetAddr> addrs;
  45. /// check for special local key
  46. if (key == YBUS_KEYLOCAL) {
  47. locator->GetLocalAddresses(service, addrs);
  48. } else {
  49. /// lookup address/port in the locator table
  50. locator->LocateAll(service, key, addrs);
  51. }
  52. if (addrs.size() == 0) {
  53. return MESSAGE_SERVICE_UNKNOWN;
  54. } else if (addrs.size() == 1) {
  55. *addr = addrs[0];
  56. } else {
  57. if (!GetIp6AddressFromVector(addrs, addr)) {
  58. /// default policy can't make choice for you here, overide GetDestination() function
  59. /// to implement custom routing strategy for your service.
  60. return MESSAGE_SERVICE_TOOMANY;
  61. }
  62. }
  63. return MESSAGE_OK;
  64. }
  65. static const sockaddr_in* SockAddrIpV4(const IRemoteAddr& a) {
  66. return (const sockaddr_in*)a.Addr();
  67. }
  68. static const sockaddr_in6* SockAddrIpV6(const IRemoteAddr& a) {
  69. return (const sockaddr_in6*)a.Addr();
  70. }
  71. static bool IsAddressEqual(const IRemoteAddr& a1, const IRemoteAddr& a2) {
  72. if (a1.Addr()->sa_family == a2.Addr()->sa_family) {
  73. if (a1.Addr()->sa_family == AF_INET) {
  74. return memcmp(&SockAddrIpV4(a1)->sin_addr, &SockAddrIpV4(a2)->sin_addr, sizeof(in_addr)) == 0;
  75. } else {
  76. return memcmp(&SockAddrIpV6(a1)->sin6_addr, &SockAddrIpV6(a2)->sin6_addr, sizeof(in6_addr)) == 0;
  77. }
  78. }
  79. return false;
  80. }
  81. TBusLocator::TBusLocator()
  82. : MyInterfaces(GetNetworkInterfaces())
  83. {
  84. }
  85. bool TBusLocator::TItem::operator<(const TItem& y) const {
  86. const TItem& x = *this;
  87. if (x.ServiceId == y.ServiceId) {
  88. return (x.End < y.End) || ((x.End == y.End) && CompareByHost(x.Addr, y.Addr) < 0);
  89. }
  90. return x.ServiceId < y.ServiceId;
  91. }
  92. bool TBusLocator::TItem::operator==(const TItem& y) const {
  93. return ServiceId == y.ServiceId && Start == y.Start && End == y.End && Addr == y.Addr;
  94. }
  95. TBusLocator::TItem::TItem(TServiceId serviceId, TBusKey start, TBusKey end, const TNetAddr& addr)
  96. : ServiceId(serviceId)
  97. , Start(start)
  98. , End(end)
  99. , Addr(addr)
  100. {
  101. }
  102. bool TBusLocator::IsLocal(const TNetAddr& addr) {
  103. for (const auto& myInterface : MyInterfaces) {
  104. if (IsAddressEqual(addr, *myInterface.Address)) {
  105. return true;
  106. }
  107. }
  108. return false;
  109. }
  110. TBusLocator::TServiceId TBusLocator::GetServiceId(const char* name) {
  111. const char* c = ServiceIdSet.insert(name).first->c_str();
  112. return (ui64)c;
  113. }
  114. int TBusLocator::RegisterBreak(TBusService service, const TVector<TBusKey>& starts, const TNetAddr& addr) {
  115. TGuard<TMutex> G(Lock);
  116. TServiceId serviceId = GetServiceId(service);
  117. for (size_t i = 0; i < starts.size(); ++i) {
  118. RegisterBreak(serviceId, starts[i], addr);
  119. }
  120. return 0;
  121. }
  122. int TBusLocator::RegisterBreak(TServiceId serviceId, const TBusKey start, const TNetAddr& addr) {
  123. TItems::const_iterator it = Items.lower_bound(TItem(serviceId, 0, start, addr));
  124. TItems::const_iterator service_it =
  125. Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr()));
  126. THolder<TItem> left;
  127. THolder<TItem> right;
  128. if ((it != Items.end() || Items.begin() != Items.end()) && service_it != Items.end() && service_it->ServiceId == serviceId) {
  129. if (it == Items.end()) {
  130. --it;
  131. }
  132. const TItem& item = *it;
  133. left.Reset(new TItem(serviceId, item.Start,
  134. Max<TBusKey>(item.Start, start - 1), item.Addr));
  135. right.Reset(new TItem(serviceId, start, item.End, addr));
  136. Items.erase(*it);
  137. } else {
  138. left.Reset(new TItem(serviceId, YBUS_KEYMIN, start, addr));
  139. if (start < YBUS_KEYMAX) {
  140. right.Reset(new TItem(serviceId, start + 1, YBUS_KEYMAX, addr));
  141. }
  142. }
  143. Items.insert(*left);
  144. Items.insert(*right);
  145. NormalizeBreaks(serviceId);
  146. return 0;
  147. }
  148. int TBusLocator::UnregisterBreak(TBusService service, const TNetAddr& addr) {
  149. TGuard<TMutex> G(Lock);
  150. TServiceId serviceId = GetServiceId(service);
  151. return UnregisterBreak(serviceId, addr);
  152. }
  153. int TBusLocator::UnregisterBreak(TServiceId serviceId, const TNetAddr& addr) {
  154. int deleted = 0;
  155. TItems::iterator it = Items.begin();
  156. while (it != Items.end()) {
  157. const TItem& item = *it;
  158. if (item.ServiceId != serviceId) {
  159. ++it;
  160. continue;
  161. }
  162. TItems::iterator itErase = it++;
  163. if (item.ServiceId == serviceId && item.Addr == addr) {
  164. Items.erase(itErase);
  165. deleted += 1;
  166. }
  167. }
  168. if (Items.begin() == Items.end()) {
  169. return deleted;
  170. }
  171. TBusKey keyItem = YBUS_KEYMAX;
  172. it = Items.end();
  173. TItems::iterator first = it;
  174. do {
  175. --it;
  176. // item.Start is not used in set comparison function
  177. // so you can't violate set sort order by changing it
  178. // hence const_cast()
  179. TItem& item = const_cast<TItem&>(*it);
  180. if (item.ServiceId != serviceId) {
  181. continue;
  182. }
  183. first = it;
  184. if (item.End < keyItem) {
  185. item.End = keyItem;
  186. }
  187. keyItem = item.Start - 1;
  188. } while (it != Items.begin());
  189. if (first != Items.end() && first->Start != 0) {
  190. TItem item(serviceId, YBUS_KEYMIN, first->Start - 1, first->Addr);
  191. Items.insert(item);
  192. }
  193. NormalizeBreaks(serviceId);
  194. return deleted;
  195. }
  196. void TBusLocator::NormalizeBreaks(TServiceId serviceId) {
  197. TItems::const_iterator first = Items.lower_bound(TItem(serviceId, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
  198. TItems::const_iterator last = Items.end();
  199. if ((Items.end() != first) && (first->ServiceId == serviceId)) {
  200. if (serviceId != Max<TServiceId>()) {
  201. last = Items.lower_bound(TItem(serviceId + 1, YBUS_KEYMIN, YBUS_KEYMIN, TNetAddr()));
  202. }
  203. --last;
  204. Y_ASSERT(Items.end() != last);
  205. Y_ASSERT(last->ServiceId == serviceId);
  206. TItem& beg = const_cast<TItem&>(*first);
  207. beg.Addr = last->Addr;
  208. }
  209. }
  210. int TBusLocator::LocateAll(TBusService service, TBusKey key, TVector<TNetAddr>& addrs) {
  211. TGuard<TMutex> G(Lock);
  212. Y_ABORT_UNLESS(addrs.empty(), "Non emtpy addresses");
  213. TServiceId serviceId = GetServiceId(service);
  214. TItems::const_iterator it;
  215. for (it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
  216. it != Items.end() && it->ServiceId == serviceId && it->Start <= key && key <= it->End;
  217. ++it) {
  218. const TItem& item = *it;
  219. addrs.push_back(item.Addr);
  220. }
  221. if (addrs.size() == 0) {
  222. return -1;
  223. }
  224. return (int)addrs.size();
  225. }
  226. int TBusLocator::Locate(TBusService service, TBusKey key, TNetAddr* addr) {
  227. TGuard<TMutex> G(Lock);
  228. TServiceId serviceId = GetServiceId(service);
  229. TItems::const_iterator it;
  230. it = Items.lower_bound(TItem(serviceId, 0, key, TNetAddr()));
  231. if (it != Items.end()) {
  232. const TItem& item = *it;
  233. if (item.ServiceId == serviceId && item.Start <= key && key < item.End) {
  234. *addr = item.Addr;
  235. return 0;
  236. }
  237. }
  238. return -1;
  239. }
  240. int TBusLocator::GetLocalPort(TBusService service) {
  241. TGuard<TMutex> G(Lock);
  242. TServiceId serviceId = GetServiceId(service);
  243. TItems::const_iterator it;
  244. int port = 0;
  245. for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
  246. const TItem& item = *it;
  247. if (item.ServiceId != serviceId) {
  248. break;
  249. }
  250. if (IsLocal(item.Addr)) {
  251. if (port != 0 && port != GetAddrPort(item.Addr)) {
  252. Y_ASSERT(0 && "Can't decide which port to use.");
  253. return 0;
  254. }
  255. port = GetAddrPort(item.Addr);
  256. }
  257. }
  258. return port;
  259. }
  260. int TBusLocator::GetLocalAddresses(TBusService service, TVector<TNetAddr>& addrs) {
  261. TGuard<TMutex> G(Lock);
  262. TServiceId serviceId = GetServiceId(service);
  263. TItems::const_iterator it;
  264. for (it = Items.lower_bound(TItem(serviceId, 0, 0, TNetAddr())); it != Items.end(); ++it) {
  265. const TItem& item = *it;
  266. if (item.ServiceId != serviceId) {
  267. break;
  268. }
  269. if (IsLocal(item.Addr)) {
  270. addrs.push_back(item.Addr);
  271. }
  272. }
  273. if (addrs.size() == 0) {
  274. return -1;
  275. }
  276. return (int)addrs.size();
  277. }
  278. int TBusLocator::LocateHost(TBusService service, TBusKey key, TString* host, int* port, bool* isLocal) {
  279. int ret;
  280. TNetAddr addr;
  281. ret = Locate(service, key, &addr);
  282. if (ret != 0) {
  283. return ret;
  284. }
  285. {
  286. TGuard<TMutex> G(Lock);
  287. THostAddrMap::const_iterator it = HostAddrMap.find(addr);
  288. if (it == HostAddrMap.end()) {
  289. return -1;
  290. }
  291. *host = it->second;
  292. }
  293. *port = GetAddrPort(addr);
  294. if (isLocal != nullptr) {
  295. *isLocal = IsLocal(addr);
  296. }
  297. return 0;
  298. }
  299. int TBusLocator::LocateKeys(TBusService service, TBusKeyVec& keys, bool onlyLocal) {
  300. TGuard<TMutex> G(Lock);
  301. Y_ABORT_UNLESS(keys.empty(), "Non empty keys");
  302. TServiceId serviceId = GetServiceId(service);
  303. TItems::const_iterator it;
  304. for (it = Items.begin(); it != Items.end(); ++it) {
  305. const TItem& item = *it;
  306. if (item.ServiceId != serviceId) {
  307. continue;
  308. }
  309. if (onlyLocal && !IsLocal(item.Addr)) {
  310. continue;
  311. }
  312. keys.push_back(std::make_pair(item.Start, item.End));
  313. }
  314. return (int)keys.size();
  315. }
  316. int TBusLocator::Register(TBusService service, const char* hostName, int port, TBusKey start /*= YBUS_KEYMIN*/, TBusKey end /*= YBUS_KEYMAX*/, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
  317. TNetAddr addr(hostName, port, requireVersion, preferVersion); // throws
  318. {
  319. TGuard<TMutex> G(Lock);
  320. HostAddrMap[addr] = hostName;
  321. }
  322. Register(service, start, end, addr);
  323. return 0;
  324. }
  325. int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetworkAddress& na, EIpVersion requireVersion /*= EIP_VERSION_4*/, EIpVersion preferVersion /*= EIP_VERSION_ANY*/) {
  326. TNetAddr addr(na, requireVersion, preferVersion); // throws
  327. Register(service, start, end, addr);
  328. return 0;
  329. }
  330. int TBusLocator::Register(TBusService service, TBusKey start, TBusKey end, const TNetAddr& addr) {
  331. TGuard<TMutex> G(Lock);
  332. TServiceId serviceId = GetServiceId(service);
  333. TItems::const_iterator it;
  334. TItem itemToReg(serviceId, start, end, addr);
  335. for (it = Items.lower_bound(TItem(serviceId, 0, start, TNetAddr()));
  336. it != Items.end() && it->ServiceId == serviceId;
  337. ++it) {
  338. const TItem& item = *it;
  339. if (item == itemToReg) {
  340. return 0;
  341. }
  342. if ((item.Start < start && start < item.End) || (item.Start < end && end < item.End)) {
  343. Y_ABORT("Overlap in registered keys with non-identical range");
  344. }
  345. }
  346. Items.insert(itemToReg);
  347. return 0;
  348. }
  349. int TBusLocator::Unregister(TBusService service, TBusKey start, TBusKey end) {
  350. TGuard<TMutex> G(Lock);
  351. TServiceId serviceId = GetServiceId(service);
  352. Items.erase(TItem(serviceId, start, end, TNetAddr()));
  353. return 0;
  354. }
  355. }