configs_dispatcher.cpp 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030
  1. #include "config_helpers.h"
  2. #include "configs_dispatcher.h"
  3. #include "console_configs_subscriber.h"
  4. #include "console.h"
  5. #include "http.h"
  6. #include "util.h"
  7. #include <ydb/core/cms/console/util/config_index.h>
  8. #include <ydb/library/yaml_config/util.h>
  9. #include <ydb/library/yaml_config/yaml_config.h>
  10. #include <ydb/core/mind/tenant_pool.h>
  11. #include <ydb/core/mon/mon.h>
  12. #include <ydb/library/actors/core/actor_bootstrapped.h>
  13. #include <ydb/library/actors/core/interconnect.h>
  14. #include <ydb/library/actors/core/mon.h>
  15. #include <ydb/library/actors/interconnect/interconnect.h>
  16. #include <library/cpp/json/json_reader.h>
  17. #include <library/cpp/json/json_writer.h>
  18. #include <util/generic/bitmap.h>
  19. #include <util/generic/ptr.h>
  20. #include <util/string/join.h>
  21. #if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR || defined BLOG_TRACE
  22. #error log macro definition clash
  23. #endif
  24. #define BLOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::CONFIGS_DISPATCHER, stream)
  25. #define BLOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::CONFIGS_DISPATCHER, stream)
  26. #define BLOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::CONFIGS_DISPATCHER, stream)
  27. #define BLOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::CONFIGS_DISPATCHER, stream)
  28. #define BLOG_ERROR(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::CONFIGS_DISPATCHER, stream)
  29. #define BLOG_TRACE(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::CONFIGS_DISPATCHER, stream)
  30. namespace NKikimr::NConsole {
  31. const THashSet<ui32> DYNAMIC_KINDS({
  32. (ui32)NKikimrConsole::TConfigItem::ActorSystemConfigItem,
  33. (ui32)NKikimrConsole::TConfigItem::BootstrapConfigItem,
  34. (ui32)NKikimrConsole::TConfigItem::CmsConfigItem,
  35. (ui32)NKikimrConsole::TConfigItem::CompactionConfigItem,
  36. (ui32)NKikimrConsole::TConfigItem::ConfigsDispatcherConfigItem,
  37. (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem,
  38. (ui32)NKikimrConsole::TConfigItem::HiveConfigItem,
  39. (ui32)NKikimrConsole::TConfigItem::ImmediateControlsConfigItem,
  40. (ui32)NKikimrConsole::TConfigItem::LogConfigItem,
  41. (ui32)NKikimrConsole::TConfigItem::MonitoringConfigItem,
  42. (ui32)NKikimrConsole::TConfigItem::NameserviceConfigItem,
  43. (ui32)NKikimrConsole::TConfigItem::NetClassifierDistributableConfigItem,
  44. (ui32)NKikimrConsole::TConfigItem::NodeBrokerConfigItem,
  45. (ui32)NKikimrConsole::TConfigItem::QueryServiceConfigItem,
  46. (ui32)NKikimrConsole::TConfigItem::SchemeShardConfigItem,
  47. (ui32)NKikimrConsole::TConfigItem::SharedCacheConfigItem,
  48. (ui32)NKikimrConsole::TConfigItem::TableProfilesConfigItem,
  49. (ui32)NKikimrConsole::TConfigItem::TableServiceConfigItem,
  50. (ui32)NKikimrConsole::TConfigItem::TenantPoolConfigItem,
  51. (ui32)NKikimrConsole::TConfigItem::TenantSlotBrokerConfigItem,
  52. (ui32)NKikimrConsole::TConfigItem::AllowEditYamlInUiItem,
  53. (ui32)NKikimrConsole::TConfigItem::BackgroundCleaningConfigItem,
  54. (ui32)NKikimrConsole::TConfigItem::TracingConfigItem,
  55. });
  56. const THashSet<ui32> NON_YAML_KINDS({
  57. (ui32)NKikimrConsole::TConfigItem::NameserviceConfigItem,
  58. (ui32)NKikimrConsole::TConfigItem::NetClassifierDistributableConfigItem,
  59. });
  60. class TConfigsDispatcher : public TActorBootstrapped<TConfigsDispatcher> {
  61. private:
  62. using TBase = TActorBootstrapped<TConfigsDispatcher>;
  63. struct TConfig {
  64. NKikimrConfig::TConfigVersion Version;
  65. NKikimrConfig::TAppConfig Config;
  66. };
  67. struct TYamlVersion {
  68. ui64 Version;
  69. TMap<ui64, ui64> VolatileVersions;
  70. };
  71. /**
  72. * Structure to describe configs subscription shared by multiple
  73. * dispatcher subscribers.
  74. */
  75. struct TSubscription : public TThrRefBase {
  76. using TPtr = TIntrusivePtr<TSubscription>;
  77. TDynBitMap Kinds;
  78. THashMap<TActorId, ui64> Subscribers;
  79. // Set to true for all yaml kinds.
  80. // Some 'legacy' kinds, which is usually managed by some automation e.g. NetClassifierDistributableConfigItem
  81. // Left this field false and consume old console configs
  82. bool Yaml = false;
  83. // Set to true if there were no config update notifications
  84. // Last config which was delivered to all subscribers.
  85. TConfig CurrentConfig;
  86. // If any yaml config delivered to all subscribers and acknowleged by them
  87. // This field is set to version from this yaml config
  88. std::optional<TYamlVersion> YamlVersion;
  89. // Config update which is currently delivered to subscribers.
  90. THolder<TEvConsole::TEvConfigNotificationRequest> UpdateInProcess = nullptr;
  91. NKikimrConfig::TConfigVersion UpdateInProcessConfigVersion;
  92. ui64 UpdateInProcessCookie;
  93. std::optional<TYamlVersion> UpdateInProcessYamlVersion;
  94. // Subscribers who didn't respond yet to the latest config update.
  95. THashSet<TActorId> SubscribersToUpdate;
  96. };
  97. /**
  98. * Structure to describe subscribers. Subscriber can have multiple
  99. * subscriptions but we allow only single subscription per external
  100. * client. The only client who can have multiple subscriptions is
  101. * dispatcher itself.
  102. */
  103. struct TSubscriber : public TThrRefBase {
  104. using TPtr = TIntrusivePtr<TSubscriber>;
  105. TActorId Subscriber;
  106. THashSet<TSubscription::TPtr> Subscriptions;
  107. NKikimrConfig::TConfigVersion CurrentConfigVersion;
  108. };
  109. public:
  110. static constexpr NKikimrServices::TActivity::EType ActorActivityType()
  111. {
  112. return NKikimrServices::TActivity::CONFIGS_DISPATCHER_ACTOR;
  113. }
  114. TConfigsDispatcher(const TConfigsDispatcherInitInfo& initInfo);
  115. void Bootstrap();
  116. void EnqueueEvent(TAutoPtr<IEventHandle> &ev);
  117. void ProcessEnqueuedEvents();
  118. void SendUpdateToSubscriber(TSubscription::TPtr subscription, TActorId subscriber);
  119. TSubscription::TPtr FindSubscription(const TActorId &subscriber);
  120. TSubscription::TPtr FindSubscription(const TDynBitMap &kinds);
  121. TSubscriber::TPtr FindSubscriber(TActorId aid);
  122. void UpdateYamlVersion(const TSubscription::TPtr &kinds) const;
  123. struct TCheckKindsResult {
  124. bool HasYamlKinds = false;
  125. bool HasNonYamlKinds = false;
  126. };
  127. TCheckKindsResult CheckKinds(const TVector<ui32>& kinds, const char* errorContext) const;
  128. NKikimrConfig::TAppConfig ParseYamlProtoConfig();
  129. TDynBitMap FilterKinds(const TDynBitMap& in);
  130. void Handle(NMon::TEvHttpInfo::TPtr &ev);
  131. void Handle(TEvInterconnect::TEvNodesInfo::TPtr &ev);
  132. void Handle(TEvConsole::TEvConfigSubscriptionNotification::TPtr &ev);
  133. void Handle(TEvConsole::TEvConfigSubscriptionError::TPtr &ev);
  134. void Handle(TEvConsole::TEvConfigNotificationResponse::TPtr &ev);
  135. void Handle(TEvConfigsDispatcher::TEvGetConfigRequest::TPtr &ev);
  136. void Handle(TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest::TPtr &ev);
  137. void Handle(TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr &ev);
  138. void Handle(TEvConsole::TEvConfigNotificationRequest::TPtr &ev);
  139. void Handle(TEvConsole::TEvGetNodeLabelsRequest::TPtr &ev);
  140. void ReplyMonJson(TActorId mailbox);
  141. STATEFN(StateInit)
  142. {
  143. TRACE_EVENT(NKikimrServices::CONFIGS_DISPATCHER);
  144. switch (ev->GetTypeRewrite()) {
  145. // Monitoring page
  146. hFuncTraced(NMon::TEvHttpInfo, Handle);
  147. hFuncTraced(TEvInterconnect::TEvNodesInfo, Handle);
  148. // Updates from console
  149. hFuncTraced(TEvConsole::TEvConfigSubscriptionNotification, Handle);
  150. hFuncTraced(TEvConsole::TEvConfigSubscriptionError, Handle);
  151. // Events from clients
  152. hFuncTraced(TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle);
  153. hFuncTraced(TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle);
  154. // Resolve
  155. hFunc(TEvConsole::TEvGetNodeLabelsRequest, Handle);
  156. default:
  157. EnqueueEvent(ev);
  158. break;
  159. }
  160. }
  161. STATEFN(StateWork)
  162. {
  163. TRACE_EVENT(NKikimrServices::CONFIGS_DISPATCHER);
  164. switch (ev->GetTypeRewrite()) {
  165. // Monitoring page
  166. hFuncTraced(NMon::TEvHttpInfo, Handle);
  167. hFuncTraced(TEvInterconnect::TEvNodesInfo, Handle);
  168. // Updates from console
  169. hFuncTraced(TEvConsole::TEvConfigSubscriptionNotification, Handle);
  170. hFuncTraced(TEvConsole::TEvConfigSubscriptionError, Handle);
  171. // Events from clients
  172. hFuncTraced(TEvConfigsDispatcher::TEvGetConfigRequest, Handle);
  173. hFuncTraced(TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest, Handle);
  174. hFuncTraced(TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest, Handle);
  175. hFuncTraced(TEvConsole::TEvConfigNotificationResponse, Handle);
  176. IgnoreFunc(TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse);
  177. // Resolve
  178. hFunc(TEvConsole::TEvGetNodeLabelsRequest, Handle);
  179. // Ignore these console requests until we get rid of persistent subscriptions-related code
  180. IgnoreFunc(TEvConsole::TEvAddConfigSubscriptionResponse);
  181. IgnoreFunc(TEvConsole::TEvGetNodeConfigResponse);
  182. // Pretend we got this
  183. hFuncTraced(TEvConsole::TEvConfigNotificationRequest, Handle);
  184. default:
  185. Y_ABORT("unexpected event type: %" PRIx32 " event: %s",
  186. ev->GetTypeRewrite(), ev->ToString().data());
  187. break;
  188. }
  189. }
  190. private:
  191. const TMap<TString, TString> Labels;
  192. const std::variant<std::monostate, TDenyList, TAllowList> ItemsServeRules;
  193. const NKikimrConfig::TAppConfig BaseConfig;
  194. NKikimrConfig::TAppConfig CurrentConfig;
  195. const std::optional<TDebugInfo> DebugInfo;
  196. ui64 NextRequestCookie;
  197. TVector<TActorId> HttpRequests;
  198. TActorId CommonSubscriptionClient;
  199. TDeque<TAutoPtr<IEventHandle>> EventsQueue;
  200. THashMap<TActorId, TSubscription::TPtr> SubscriptionsBySubscriber;
  201. THashMap<TDynBitMap, TSubscription::TPtr> SubscriptionsByKinds;
  202. THashMap<TActorId, TSubscriber::TPtr> Subscribers;
  203. TString YamlConfig;
  204. TMap<ui64, TString> VolatileYamlConfigs;
  205. TMap<ui64, size_t> VolatileYamlConfigHashes;
  206. TString ResolvedYamlConfig;
  207. TString ResolvedJsonConfig;
  208. NKikimrConfig::TAppConfig YamlProtoConfig;
  209. bool YamlConfigEnabled = false;
  210. };
  211. TConfigsDispatcher::TConfigsDispatcher(const TConfigsDispatcherInitInfo& initInfo)
  212. : Labels(initInfo.Labels)
  213. , ItemsServeRules(initInfo.ItemsServeRules)
  214. , BaseConfig(initInfo.InitialConfig)
  215. , CurrentConfig(initInfo.InitialConfig)
  216. , DebugInfo(initInfo.DebugInfo)
  217. , NextRequestCookie(Now().GetValue())
  218. {}
  219. void TConfigsDispatcher::Bootstrap()
  220. {
  221. BLOG_D("TConfigsDispatcher Bootstrap");
  222. NActors::TMon *mon = AppData()->Mon;
  223. if (mon) {
  224. NMonitoring::TIndexMonPage *actorsMonPage = mon->RegisterIndexPage("actors", "Actors");
  225. mon->RegisterActorPage(actorsMonPage, "configs_dispatcher", "Configs Dispatcher", false, TlsActivationContext->ExecutorThread.ActorSystem, SelfId());
  226. }
  227. auto commonClient = CreateConfigsSubscriber(
  228. SelfId(),
  229. TVector<ui32>(DYNAMIC_KINDS.begin(), DYNAMIC_KINDS.end()),
  230. CurrentConfig,
  231. 0,
  232. true,
  233. 1);
  234. CommonSubscriptionClient = RegisterWithSameMailbox(commonClient);
  235. Become(&TThis::StateInit);
  236. }
  237. void TConfigsDispatcher::EnqueueEvent(TAutoPtr<IEventHandle> &ev)
  238. {
  239. BLOG_D("Enqueue event type: " << ev->GetTypeRewrite());
  240. EventsQueue.push_back(ev);
  241. }
  242. void TConfigsDispatcher::ProcessEnqueuedEvents()
  243. {
  244. while (!EventsQueue.empty()) {
  245. TAutoPtr<IEventHandle> &ev = EventsQueue.front();
  246. BLOG_D("Dequeue event type: " << ev->GetTypeRewrite());
  247. TlsActivationContext->Send(ev.Release());
  248. EventsQueue.pop_front();
  249. }
  250. }
  251. void TConfigsDispatcher::SendUpdateToSubscriber(TSubscription::TPtr subscription, TActorId subscriber)
  252. {
  253. Y_ABORT_UNLESS(subscription->UpdateInProcess);
  254. subscription->SubscribersToUpdate.insert(subscriber);
  255. auto notification = MakeHolder<TEvConsole::TEvConfigNotificationRequest>();
  256. notification->Record.CopyFrom(subscription->UpdateInProcess->Record);
  257. BLOG_TRACE("Send TEvConsole::TEvConfigNotificationRequest to " << subscriber
  258. << ": " << notification->Record.ShortDebugString());
  259. Send(subscriber, notification.Release(), 0, subscription->UpdateInProcessCookie);
  260. }
  261. TConfigsDispatcher::TSubscription::TPtr TConfigsDispatcher::FindSubscription(const TDynBitMap &kinds)
  262. {
  263. if (auto it = SubscriptionsByKinds.find(kinds); it != SubscriptionsByKinds.end())
  264. return it->second;
  265. return nullptr;
  266. }
  267. TConfigsDispatcher::TSubscription::TPtr TConfigsDispatcher::FindSubscription(const TActorId &id)
  268. {
  269. if (auto it = SubscriptionsBySubscriber.find(id); it != SubscriptionsBySubscriber.end())
  270. return it->second;
  271. return nullptr;
  272. }
  273. TConfigsDispatcher::TSubscriber::TPtr TConfigsDispatcher::FindSubscriber(TActorId aid)
  274. {
  275. if (auto it = Subscribers.find(aid); it != Subscribers.end())
  276. return it->second;
  277. return nullptr;
  278. }
  279. NKikimrConfig::TAppConfig TConfigsDispatcher::ParseYamlProtoConfig()
  280. {
  281. NKikimrConfig::TAppConfig newYamlProtoConfig = {};
  282. try {
  283. NYamlConfig::ResolveAndParseYamlConfig(
  284. YamlConfig,
  285. VolatileYamlConfigs,
  286. Labels,
  287. newYamlProtoConfig,
  288. &ResolvedYamlConfig,
  289. &ResolvedJsonConfig);
  290. } catch (const yexception& ex) {
  291. BLOG_ERROR("Got invalid config from console error# " << ex.what());
  292. }
  293. return newYamlProtoConfig;
  294. }
  295. void TConfigsDispatcher::Handle(NMon::TEvHttpInfo::TPtr &ev)
  296. {
  297. if (auto it = ev->Get()->Request.GetHeaders().FindHeader("Content-Type"); it && it->Value() == "application/json") {
  298. ReplyMonJson(ev->Sender);
  299. return;
  300. }
  301. if (HttpRequests.empty())
  302. Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes);
  303. HttpRequests.push_back(ev->Sender);
  304. }
  305. void TConfigsDispatcher::ReplyMonJson(TActorId mailbox) {
  306. TStringStream str;
  307. str << NMonitoring::HTTPOKJSON;
  308. NJson::TJsonValue response;
  309. response.SetType(NJson::EJsonValueType::JSON_MAP);
  310. auto& labels = response["labels"];
  311. labels.SetType(NJson::EJsonValueType::JSON_ARRAY);
  312. for (auto &[key, value] : Labels) {
  313. NJson::TJsonValue label;
  314. label.SetType(NJson::EJsonValueType::JSON_MAP);
  315. label.InsertValue("name", key);
  316. label.InsertValue("value", value);
  317. labels.AppendValue(std::move(label));
  318. }
  319. response.InsertValue("yaml_config", YamlConfig);
  320. response.InsertValue("resolved_json_config", NJson::ReadJsonFastTree(ResolvedJsonConfig, true));
  321. response.InsertValue("current_json_config", NJson::ReadJsonFastTree(NProtobufJson::Proto2Json(CurrentConfig, NYamlConfig::GetProto2JsonConfig()), true));
  322. if (DebugInfo) {
  323. response.InsertValue("initial_json_config", NJson::ReadJsonFastTree(NProtobufJson::Proto2Json(DebugInfo->StaticConfig, NYamlConfig::GetProto2JsonConfig()), true));
  324. response.InsertValue("initial_cms_json_config", NJson::ReadJsonFastTree(NProtobufJson::Proto2Json(DebugInfo->OldDynConfig, NYamlConfig::GetProto2JsonConfig()), true));
  325. response.InsertValue("initial_cms_yaml_json_config", NJson::ReadJsonFastTree(NProtobufJson::Proto2Json(DebugInfo->NewDynConfig, NYamlConfig::GetProto2JsonConfig()), true));
  326. }
  327. NJson::WriteJson(&str, &response, {});
  328. Send(mailbox, new NMon::TEvHttpInfoRes(str.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
  329. }
  330. void TConfigsDispatcher::Handle(TEvConsole::TEvConfigNotificationRequest::TPtr &ev)
  331. {
  332. const auto &rec = ev->Get()->Record;
  333. auto resp = MakeHolder<TEvConsole::TEvConfigNotificationResponse>(rec);
  334. BLOG_TRACE("Send TEvConfigNotificationResponse: " << resp->Record.ShortDebugString());
  335. Send(ev->Sender, resp.Release(), 0, ev->Cookie);
  336. }
  337. TDynBitMap TConfigsDispatcher::FilterKinds(const TDynBitMap& in) {
  338. TDynBitMap out;
  339. if (const auto* denyList = std::get_if<TDenyList>(&ItemsServeRules)) {
  340. Y_FOR_EACH_BIT(kind, in) {
  341. if (!denyList->Items.contains(kind)) {
  342. out.Set(kind);
  343. }
  344. }
  345. } else if (const auto* allowList = std::get_if<TAllowList>(&ItemsServeRules)) {
  346. Y_FOR_EACH_BIT(kind, in) {
  347. if (allowList->Items.contains(kind)) {
  348. out.Set(kind);
  349. }
  350. }
  351. } else {
  352. out = in;
  353. }
  354. return out;
  355. }
  356. void TConfigsDispatcher::Handle(TEvInterconnect::TEvNodesInfo::TPtr &ev)
  357. {
  358. Y_UNUSED(ev);
  359. TStringStream str;
  360. str << NMonitoring::HTTPOKHTML;
  361. HTML(str) {
  362. HEAD() {
  363. str << "<link rel='stylesheet' href='../cms/ext/bootstrap.min.css'>" << Endl
  364. << "<script language='javascript' type='text/javascript' src='../cms/ext/jquery.min.js'></script>" << Endl
  365. << "<script language='javascript' type='text/javascript' src='../cms/ext/bootstrap.bundle.min.js'></script>" << Endl
  366. << "<script language='javascript' type='text/javascript'>" << Endl
  367. << "var nodeNames = [";
  368. for (auto &node: ev->Get()->Nodes) {
  369. str << "{'nodeName':'" << node.Host << "'}, ";
  370. }
  371. str << "];" << Endl
  372. << "</script>" << Endl
  373. << "<script src='../cms/ext/fuse.min.js'></script>" << Endl
  374. << "<script src='../cms/common.js'></script>" << Endl
  375. << "<script src='../cms/ext/fuzzycomplete.min.js'></script>" << Endl
  376. << "<link rel='stylesheet' href='../cms/ext/fuzzycomplete.min.css'>" << Endl
  377. << "<link rel='stylesheet' href='../cms/cms.css'>" << Endl
  378. << "<script data-main='../cms/configs_dispatcher_main' src='../cms/ext/require.min.js'></script>" << Endl;
  379. }
  380. NHttp::OutputStyles(str);
  381. DIV() {
  382. OL_CLASS("breadcrumb") {
  383. LI_CLASS("breadcrumb-item") {
  384. str << "<a href='..' id='host-ref'>YDB Developer UI</a>" << Endl;
  385. }
  386. LI_CLASS("breadcrumb-item") {
  387. str << "<a href='.'>Actors</a>" << Endl;
  388. }
  389. LI_CLASS("breadcrumb-item active") {
  390. str << "Configs Dispatcher" << Endl;
  391. }
  392. }
  393. }
  394. DIV_CLASS("container") {
  395. DIV_CLASS("navbar navbar-expand-lg navbar-light bg-light") {
  396. DIV_CLASS("navbar-collapse") {
  397. UL_CLASS("navbar-nav mr-auto") {
  398. LI_CLASS("nav-item") {
  399. str << "<a class='nav-link' href=\"../cms?#page=yaml-config\">Console</a>" << Endl;
  400. }
  401. }
  402. FORM_CLASS("form-inline my-2 my-lg-0") {
  403. str << "<input type='text' id='nodePicker' class='form-control mr-sm-2' name='nodes' placeholder='Nodes...'>" << Endl;
  404. str << "<a type='button' class='btn btn-primary my-2 my-sm-0' id='nodesGo'>Go</a>" << Endl;
  405. }
  406. }
  407. }
  408. DIV_CLASS("tab-left") {
  409. COLLAPSED_REF_CONTENT("node-labels", "Node labels") {
  410. PRE() {
  411. for (auto &[key, value] : Labels) {
  412. str << key << " = " << value << Endl;
  413. }
  414. }
  415. }
  416. str << "<br />" << Endl;
  417. COLLAPSED_REF_CONTENT("effective-config", "Effective config") {
  418. str << "<div class=\"alert alert-primary tab-left\" role=\"alert\">" << Endl;
  419. str << "It is assumed that all config items marked dynamic are consumed by corresponding components at runtime." << Endl;
  420. str << "<br />" << Endl;
  421. str << "If any component is unable to update some config at runtime check \"Effective startup config\" below to see actual config for this component." << Endl;
  422. str << "<br />" << Endl;
  423. str << "Coloring: \"<font color=\"red\">config not set</font>\","
  424. << " \"<font color=\"green\">config set in dynamic config</font>\", \"<font color=\"#007bff\">config set in static config</font>\"" << Endl;
  425. str << "</div>" << Endl;
  426. NHttp::OutputRichConfigHTML(str, BaseConfig, YamlProtoConfig, CurrentConfig, DYNAMIC_KINDS, NON_YAML_KINDS, YamlConfigEnabled);
  427. }
  428. str << "<br />" << Endl;
  429. COLLAPSED_REF_CONTENT("effective-startup-config", "Effective startup config") {
  430. str << "<div class=\"alert alert-primary tab-left\" role=\"alert\">" << Endl;
  431. str << "Some of these configs may be overwritten by dynamic ones." << Endl;
  432. str << "</div>" << Endl;
  433. NHttp::OutputConfigHTML(str, BaseConfig);
  434. }
  435. str << "<br />" << Endl;
  436. COLLAPSED_REF_CONTENT("effective-dynamic-config", "Effective dynamic config") {
  437. str << "<div class=\"alert alert-primary tab-left\" role=\"alert\">" << Endl;
  438. str << "Some subscribers may get static configs if they exist even if dynamic one of corresponding kind is not present." << Endl;
  439. str << "</div>" << Endl;
  440. NKikimrConfig::TAppConfig trunc;
  441. if (YamlConfigEnabled) {
  442. ReplaceConfigItems(YamlProtoConfig, trunc, FilterKinds(KindsToBitMap(DYNAMIC_KINDS)), BaseConfig);
  443. ReplaceConfigItems(CurrentConfig, trunc, FilterKinds(KindsToBitMap(NON_YAML_KINDS)), trunc, false);
  444. } else {
  445. ReplaceConfigItems(CurrentConfig, trunc, FilterKinds(KindsToBitMap(DYNAMIC_KINDS)), BaseConfig);
  446. }
  447. NHttp::OutputConfigHTML(str, trunc);
  448. }
  449. str << "<br />" << Endl;
  450. COLLAPSED_REF_CONTENT("debug-info", "Debug info") {
  451. DIV_CLASS("tab-left") {
  452. COLLAPSED_REF_CONTENT("effective-config-debug-info", "Effective config debug info") {
  453. NHttp::OutputConfigDebugInfoHTML(
  454. str,
  455. BaseConfig,
  456. YamlProtoConfig,
  457. CurrentConfig,
  458. {DebugInfo ? DebugInfo->InitInfo : THashMap<ui32, TConfigItemInfo>{}},
  459. DYNAMIC_KINDS,
  460. NON_YAML_KINDS,
  461. YamlConfigEnabled);
  462. }
  463. str << "<br />" << Endl;
  464. COLLAPSED_REF_CONTENT("state", "State") {
  465. PRE() {
  466. str << "SelfId: " << SelfId() << Endl;
  467. auto s = CurrentStateFunc();
  468. str << "State: " << ( s == &TThis::StateWork ? "StateWork"
  469. : s == &TThis::StateInit ? "StateInit"
  470. : "Unknown" ) << Endl;
  471. str << "YamlConfigEnabled: " << YamlConfigEnabled << Endl;
  472. str << "Subscriptions: " << Endl;
  473. for (auto &[kinds, subscription] : SubscriptionsByKinds) {
  474. str << "- Kinds: " << KindsToString(kinds) << Endl
  475. << " Subscription: " << Endl
  476. << " Yaml: " << subscription->Yaml << Endl
  477. << " Subscribers: " << Endl;
  478. for (auto &[id, updates] : subscription->Subscribers) {
  479. str << " - Actor: " << id << Endl;
  480. str << " UpdatesSent: " << updates << Endl;
  481. }
  482. if (subscription->YamlVersion) {
  483. str << " YamlVersion: " << subscription->YamlVersion->Version << ".[";
  484. bool first = true;
  485. for (auto &[id, hash] : subscription->YamlVersion->VolatileVersions) {
  486. str << (first ? "" : ",") << id << "." << hash;
  487. first = false;
  488. }
  489. str << "]" << Endl;
  490. } else {
  491. str << " CurrentConfigId: " << subscription->CurrentConfig.Version.ShortDebugString() << Endl;
  492. }
  493. str << " CurrentConfig: " << subscription->CurrentConfig.Config.ShortDebugString() << Endl;
  494. if (subscription->UpdateInProcess) {
  495. str << " UpdateInProcess: " << subscription->UpdateInProcess->Record.ShortDebugString() << Endl
  496. << " SubscribersToUpdate:";
  497. for (auto &id : subscription->SubscribersToUpdate) {
  498. str << " " << id;
  499. }
  500. str << Endl;
  501. str << " UpdateInProcessConfigVersion: " << subscription->UpdateInProcessConfigVersion.ShortDebugString() << Endl
  502. << " UpdateInProcessCookie: " << subscription->UpdateInProcessCookie << Endl;
  503. if (subscription->UpdateInProcessYamlVersion) {
  504. str << " UpdateInProcessYamlVersion: " << subscription->UpdateInProcessYamlVersion->Version << Endl;
  505. }
  506. }
  507. }
  508. str << "Subscribers:" << Endl;
  509. for (auto &[subscriber, _] : SubscriptionsBySubscriber) {
  510. str << "- " << subscriber << Endl;
  511. }
  512. }
  513. }
  514. str << "<br />" << Endl;
  515. COLLAPSED_REF_CONTENT("yaml-config", "YAML config") {
  516. DIV() {
  517. TAG(TH5) {
  518. str << "Persistent Config" << Endl;
  519. }
  520. TAG_CLASS_STYLE(TDiv, "configs-dispatcher", "padding: 0 12px;") {
  521. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap fold-yaml-config yaml-btn-3"}, {"id", "fold-yaml-config"}, {"title", "fold"}}) {
  522. DIV_CLASS("yaml-sticky-btn") { }
  523. }
  524. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap unfold-yaml-config yaml-btn-2"}, {"id", "unfold-yaml-config"}, {"title", "unfold"}}) {
  525. DIV_CLASS("yaml-sticky-btn") { }
  526. }
  527. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap copy-yaml-config yaml-btn-1"}, {"id", "copy-yaml-config"}, {"title", "copy"}}) {
  528. DIV_CLASS("yaml-sticky-btn") { }
  529. }
  530. TAG_ATTRS(TDiv, {{"id", "yaml-config-item"}, {"name", "yaml-config-itemm"}}) {
  531. str << YamlConfig;
  532. }
  533. }
  534. str << "<hr/>" << Endl;
  535. for (auto &[id, config] : VolatileYamlConfigs) {
  536. DIV() {
  537. TAG(TH5) {
  538. str << "Volatile Config Id: " << id << Endl;
  539. }
  540. TAG_CLASS_STYLE(TDiv, "configs-dispatcher", "padding: 0 12px;") {
  541. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap fold-yaml-config yaml-btn-3"}, {"title", "fold"}}) {
  542. DIV_CLASS("yaml-sticky-btn") { }
  543. }
  544. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap unfold-yaml-config yaml-btn-2"}, {"title", "unfold"}}) {
  545. DIV_CLASS("yaml-sticky-btn") { }
  546. }
  547. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap copy-yaml-config yaml-btn-1"}, {"title", "copy"}}) {
  548. DIV_CLASS("yaml-sticky-btn") { }
  549. }
  550. DIV_CLASS("yaml-config-item") {
  551. str << config;
  552. }
  553. }
  554. }
  555. }
  556. }
  557. }
  558. str << "<br />" << Endl;
  559. COLLAPSED_REF_CONTENT("resolved-yaml-config", "Resolved YAML config") {
  560. TAG_CLASS_STYLE(TDiv, "configs-dispatcher", "padding: 0 12px;") {
  561. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap fold-yaml-config yaml-btn-3"}, {"id", "fold-resolved-yaml-config"}, {"title", "fold"}}) {
  562. DIV_CLASS("yaml-sticky-btn") { }
  563. }
  564. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap unfold-yaml-config yaml-btn-2"}, {"id", "unfold-resolved-yaml-config"}, {"title", "unfold"}}) {
  565. DIV_CLASS("yaml-sticky-btn") { }
  566. }
  567. TAG_ATTRS(TDiv, {{"class", "yaml-sticky-btn-wrap copy-yaml-config yaml-btn-1"}, {"id", "copy-resolved-yaml-config"}, {"title", "copy"}}) {
  568. DIV_CLASS("yaml-sticky-btn") { }
  569. }
  570. TAG_ATTRS(TDiv, {{"id", "resolved-yaml-config-item"}, {"name", "resolved-yaml-config-itemm"}}) {
  571. str << ResolvedYamlConfig;
  572. }
  573. }
  574. }
  575. str << "<br />" << Endl;
  576. COLLAPSED_REF_CONTENT("resolved-json-config", "Resolved JSON config") {
  577. PRE() {
  578. str << ResolvedJsonConfig << Endl;
  579. }
  580. }
  581. str << "<br />" << Endl;
  582. COLLAPSED_REF_CONTENT("yaml-proto-config", "YAML proto config") {
  583. NHttp::OutputConfigHTML(str, YamlProtoConfig);
  584. }
  585. str << "<br />" << Endl;
  586. COLLAPSED_REF_CONTENT("current-config", "Current config") {
  587. NHttp::OutputConfigHTML(str, CurrentConfig);
  588. }
  589. str << "<br />" << Endl;
  590. COLLAPSED_REF_CONTENT("initial-config", "Initial config") {
  591. NHttp::OutputConfigHTML(str, BaseConfig);
  592. }
  593. if (DebugInfo) {
  594. str << "<br />" << Endl;
  595. COLLAPSED_REF_CONTENT("initial-cms-config", "Initial CMS config") {
  596. NHttp::OutputConfigHTML(str, DebugInfo->OldDynConfig);
  597. }
  598. str << "<br />" << Endl;
  599. COLLAPSED_REF_CONTENT("initial-cms-yaml-config", "Initial CMS YAML config") {
  600. NHttp::OutputConfigHTML(str, DebugInfo->NewDynConfig);
  601. }
  602. }
  603. }
  604. }
  605. }
  606. }
  607. }
  608. for (auto &actor : HttpRequests) {
  609. Send(actor, new NMon::TEvHttpInfoRes(str.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom));
  610. }
  611. HttpRequests.clear();
  612. }
  613. void TConfigsDispatcher::Handle(TEvConsole::TEvConfigSubscriptionNotification::TPtr &ev)
  614. {
  615. auto &rec = ev->Get()->Record;
  616. CurrentConfig = rec.GetConfig();
  617. const auto& newYamlConfig = rec.GetYamlConfig();
  618. bool isYamlChanged = newYamlConfig != YamlConfig;
  619. if (rec.VolatileConfigsSize() != VolatileYamlConfigs.size()) {
  620. isYamlChanged = true;
  621. }
  622. for (auto &volatileConfig : rec.GetVolatileConfigs()) {
  623. if (auto it = VolatileYamlConfigHashes.find(volatileConfig.GetId());
  624. it == VolatileYamlConfigHashes.end() || it->second != THash<TString>()(volatileConfig.GetConfig())) {
  625. isYamlChanged = true;
  626. }
  627. }
  628. if (isYamlChanged) {
  629. YamlConfig = newYamlConfig;
  630. VolatileYamlConfigs.clear();
  631. VolatileYamlConfigHashes.clear();
  632. for (auto &volatileConfig : rec.GetVolatileConfigs()) {
  633. VolatileYamlConfigs[volatileConfig.GetId()] = volatileConfig.GetConfig();
  634. VolatileYamlConfigHashes[volatileConfig.GetId()] = THash<TString>()(volatileConfig.GetConfig());
  635. }
  636. }
  637. NKikimrConfig::TAppConfig newYamlProtoConfig = {};
  638. bool yamlConfigTurnedOff = false;
  639. if (!YamlConfig.empty() && isYamlChanged) {
  640. newYamlProtoConfig = ParseYamlProtoConfig();
  641. bool wasYamlConfigEnabled = YamlConfigEnabled;
  642. YamlConfigEnabled = newYamlProtoConfig.HasYamlConfigEnabled() && newYamlProtoConfig.GetYamlConfigEnabled();
  643. yamlConfigTurnedOff = wasYamlConfigEnabled && !YamlConfigEnabled;
  644. } else if (YamlConfig.empty()) {
  645. bool wasYamlConfigEnabled = YamlConfigEnabled;
  646. YamlConfigEnabled = false;
  647. yamlConfigTurnedOff = wasYamlConfigEnabled && !YamlConfigEnabled;
  648. } else {
  649. newYamlProtoConfig = YamlProtoConfig;
  650. }
  651. AppData()->YamlConfigEnabled = YamlConfigEnabled;
  652. std::swap(YamlProtoConfig, newYamlProtoConfig);
  653. THashSet<ui32> affectedKinds;
  654. for (const auto& kind : ev->Get()->Record.GetAffectedKinds()) {
  655. affectedKinds.insert(kind);
  656. }
  657. for (auto &[kinds, subscription] : SubscriptionsByKinds) {
  658. if (subscription->UpdateInProcess) {
  659. subscription->UpdateInProcess = nullptr;
  660. subscription->SubscribersToUpdate.clear();
  661. }
  662. NKikimrConfig::TAppConfig trunc;
  663. bool hasAffectedKinds = false;
  664. if (subscription->Yaml && YamlConfigEnabled) {
  665. ReplaceConfigItems(YamlProtoConfig, trunc, FilterKinds(subscription->Kinds), BaseConfig);
  666. } else {
  667. Y_FOR_EACH_BIT(kind, kinds) {
  668. if (affectedKinds.contains(kind)) {
  669. hasAffectedKinds = true;
  670. }
  671. }
  672. // we try resend all configs if yaml config was turned off
  673. if (!hasAffectedKinds && !yamlConfigTurnedOff && CurrentStateFunc() != &TThis::StateInit) {
  674. continue;
  675. }
  676. ReplaceConfigItems(ev->Get()->Record.GetConfig(), trunc, FilterKinds(kinds), BaseConfig);
  677. }
  678. if (hasAffectedKinds || !CompareConfigs(subscription->CurrentConfig.Config, trunc) || CurrentStateFunc() == &TThis::StateInit) {
  679. subscription->UpdateInProcess = MakeHolder<TEvConsole::TEvConfigNotificationRequest>();
  680. subscription->UpdateInProcess->Record.MutableConfig()->CopyFrom(trunc);
  681. subscription->UpdateInProcess->Record.SetLocal(true);
  682. Y_FOR_EACH_BIT(kind, kinds) {
  683. subscription->UpdateInProcess->Record.AddItemKinds(kind);
  684. }
  685. subscription->UpdateInProcessCookie = ++NextRequestCookie;
  686. subscription->UpdateInProcessConfigVersion = FilterVersion(ev->Get()->Record.GetConfig().GetVersion(), kinds);
  687. if (YamlConfigEnabled) {
  688. UpdateYamlVersion(subscription);
  689. }
  690. for (auto &[subscriber, updates] : subscription->Subscribers) {
  691. auto k = kinds;
  692. BLOG_TRACE("Sending for kinds: " << KindsToString(k));
  693. SendUpdateToSubscriber(subscription, subscriber);
  694. ++updates;
  695. }
  696. } else if (YamlConfigEnabled && subscription->Yaml) {
  697. UpdateYamlVersion(subscription);
  698. } else if (!YamlConfigEnabled) {
  699. subscription->YamlVersion = std::nullopt;
  700. }
  701. }
  702. if (CurrentStateFunc() == &TThis::StateInit) {
  703. Become(&TThis::StateWork);
  704. ProcessEnqueuedEvents();
  705. }
  706. }
  707. void TConfigsDispatcher::UpdateYamlVersion(const TSubscription::TPtr &subscription) const
  708. {
  709. TYamlVersion yamlVersion;
  710. yamlVersion.Version = NYamlConfig::GetVersion(YamlConfig);
  711. for (auto &[id, hash] : VolatileYamlConfigHashes) {
  712. yamlVersion.VolatileVersions[id] = hash;
  713. }
  714. subscription->UpdateInProcessYamlVersion = yamlVersion;
  715. }
  716. void TConfigsDispatcher::Handle(TEvConsole::TEvConfigSubscriptionError::TPtr &ev)
  717. {
  718. // The only reason we can get this response is ambiguous domain
  719. // So it is okay to fail here
  720. Y_ABORT("Can't start Configs Dispatcher: %s",
  721. ev->Get()->Record.GetReason().c_str());
  722. }
  723. void TConfigsDispatcher::Handle(TEvConfigsDispatcher::TEvGetConfigRequest::TPtr &ev)
  724. {
  725. auto resp = MakeHolder<TEvConfigsDispatcher::TEvGetConfigResponse>();
  726. auto [yamlKinds, _] = CheckKinds(
  727. ev->Get()->ConfigItemKinds,
  728. "TEvGetConfigRequest handler");
  729. auto trunc = std::make_shared<NKikimrConfig::TAppConfig>();
  730. auto kinds = KindsToBitMap(ev->Get()->ConfigItemKinds);
  731. if (YamlConfigEnabled && yamlKinds) {
  732. ReplaceConfigItems(YamlProtoConfig, *trunc, FilterKinds(kinds), BaseConfig);
  733. } else {
  734. ReplaceConfigItems(CurrentConfig, *trunc, FilterKinds(kinds), BaseConfig);
  735. }
  736. resp->Config = trunc;
  737. BLOG_TRACE("Send TEvConfigsDispatcher::TEvGetConfigResponse"
  738. " to " << ev->Sender << ": " << resp->Config->ShortDebugString());
  739. Send(ev->Sender, std::move(resp), 0, ev->Cookie);
  740. }
  741. TConfigsDispatcher::TCheckKindsResult TConfigsDispatcher::CheckKinds(const TVector<ui32>& kinds, const char* errorContext) const {
  742. bool yamlKinds = false;
  743. bool nonYamlKinds = false;
  744. for (auto kind : kinds) {
  745. if (!DYNAMIC_KINDS.contains(kind)) {
  746. TStringStream sstr;
  747. sstr << static_cast<NKikimrConsole::TConfigItem::EKind>(kind);
  748. Y_ABORT("unexpected kind in %s: %s", errorContext, sstr.Str().data());
  749. }
  750. if (NON_YAML_KINDS.contains(kind)) {
  751. nonYamlKinds = true;
  752. } else {
  753. yamlKinds = true;
  754. }
  755. }
  756. if (yamlKinds && nonYamlKinds) {
  757. Y_ABORT("both yaml and non yaml kinds in %s", errorContext);
  758. }
  759. return {yamlKinds, nonYamlKinds};
  760. }
  761. void TConfigsDispatcher::Handle(TEvConfigsDispatcher::TEvSetConfigSubscriptionRequest::TPtr &ev)
  762. {
  763. auto [yamlKinds, nonYamlKinds] = CheckKinds(
  764. ev->Get()->ConfigItemKinds,
  765. "SetConfigSubscriptionRequest handler");
  766. Y_UNUSED(nonYamlKinds);
  767. auto kinds = KindsToBitMap(ev->Get()->ConfigItemKinds);
  768. auto subscriberActor = ev->Get()->Subscriber ? ev->Get()->Subscriber : ev->Sender;
  769. auto subscription = FindSubscription(kinds);
  770. if (!subscription) {
  771. subscription = new TSubscription;
  772. subscription->Kinds = kinds;
  773. subscription->Yaml = yamlKinds;
  774. SubscriptionsByKinds.emplace(kinds, subscription);
  775. }
  776. auto [subscriberIt, _] = subscription->Subscribers.emplace(subscriberActor, 0);
  777. SubscriptionsBySubscriber.emplace(subscriberActor, subscription);
  778. auto subscriber = FindSubscriber(subscriberActor);
  779. if (!subscriber) {
  780. subscriber = new TSubscriber;
  781. subscriber->Subscriber = subscriberActor;
  782. Subscribers.emplace(subscriberActor, subscriber);
  783. }
  784. subscriber->Subscriptions.insert(subscription);
  785. // We don't care about versions and kinds here
  786. Send(ev->Sender, new TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse);
  787. if (CurrentStateFunc() != &TThis::StateInit) {
  788. // first time we send even empty config
  789. if (!subscription->UpdateInProcess) {
  790. subscription->UpdateInProcess = MakeHolder<TEvConsole::TEvConfigNotificationRequest>();
  791. NKikimrConfig::TAppConfig trunc;
  792. if (YamlConfigEnabled) {
  793. ReplaceConfigItems(YamlProtoConfig, trunc, FilterKinds(kinds), BaseConfig);
  794. } else {
  795. ReplaceConfigItems(CurrentConfig, trunc, FilterKinds(kinds), BaseConfig);
  796. }
  797. subscription->UpdateInProcess->Record.MutableConfig()->CopyFrom(trunc);
  798. Y_FOR_EACH_BIT(kind, kinds) {
  799. subscription->UpdateInProcess->Record.AddItemKinds(kind);
  800. }
  801. subscription->UpdateInProcessCookie = ++NextRequestCookie;
  802. subscription->UpdateInProcessConfigVersion = FilterVersion(CurrentConfig.GetVersion(), kinds);
  803. }
  804. BLOG_TRACE("Sending for kinds: " << KindsToString(kinds));
  805. SendUpdateToSubscriber(subscription, subscriber->Subscriber);
  806. ++(subscriberIt->second);
  807. }
  808. }
  809. void TConfigsDispatcher::Handle(TEvConfigsDispatcher::TEvRemoveConfigSubscriptionRequest::TPtr &ev)
  810. {
  811. auto subscriberActor = ev->Get()->Subscriber ? ev->Get()->Subscriber : ev->Sender;
  812. auto subscriber = FindSubscriber(subscriberActor);
  813. if (!subscriber) {
  814. return;
  815. }
  816. for (auto &subscription : subscriber->Subscriptions) {
  817. subscription->SubscribersToUpdate.erase(subscriberActor);
  818. if (subscription->SubscribersToUpdate.empty()) {
  819. if (subscription->UpdateInProcess) {
  820. subscription->CurrentConfig.Version = subscription->UpdateInProcessConfigVersion;
  821. subscription->CurrentConfig.Config = subscription->UpdateInProcess->Record.GetConfig();
  822. }
  823. subscription->YamlVersion = subscription->UpdateInProcessYamlVersion;
  824. subscription->UpdateInProcessYamlVersion = std::nullopt;
  825. subscription->UpdateInProcess = nullptr;
  826. }
  827. subscription->Subscribers.erase(subscriberActor);
  828. if (subscription->Subscribers.empty()) {
  829. SubscriptionsByKinds.erase(subscription->Kinds);
  830. }
  831. }
  832. Subscribers.erase(subscriberActor);
  833. SubscriptionsBySubscriber.erase(subscriberActor);
  834. Send(ev->Sender, new TEvConfigsDispatcher::TEvRemoveConfigSubscriptionResponse);
  835. }
  836. void TConfigsDispatcher::Handle(TEvConsole::TEvConfigNotificationResponse::TPtr &ev)
  837. {
  838. auto rec = ev->Get()->Record;
  839. auto subscription = FindSubscription(ev->Sender);
  840. // Probably subscription was cleared up due to tenant's change.
  841. if (!subscription) {
  842. BLOG_ERROR("Got notification response for unknown subscription " << ev->Sender);
  843. return;
  844. }
  845. if (!subscription->UpdateInProcess) {
  846. BLOG_D("Notification was ignored for subscription " << ev->Sender);
  847. return;
  848. }
  849. if (ev->Cookie != subscription->UpdateInProcessCookie) {
  850. BLOG_ERROR("Notification cookie mismatch for subscription " << ev->Sender << " " << ev->Cookie << " != " << subscription->UpdateInProcessCookie);
  851. // TODO fix clients
  852. return;
  853. }
  854. if (!subscription->SubscribersToUpdate.contains(ev->Sender)) {
  855. BLOG_ERROR("Notification from unexpected subscriber for subscription " << ev->Sender);
  856. return;
  857. }
  858. Subscribers.at(ev->Sender)->CurrentConfigVersion = subscription->UpdateInProcessConfigVersion;
  859. // If all subscribers responded then send response to CMS.
  860. subscription->SubscribersToUpdate.erase(ev->Sender);
  861. if (subscription->SubscribersToUpdate.empty()) {
  862. subscription->CurrentConfig.Config = subscription->UpdateInProcess->Record.GetConfig();
  863. subscription->CurrentConfig.Version = subscription->UpdateInProcessConfigVersion;
  864. subscription->YamlVersion = subscription->UpdateInProcessYamlVersion;
  865. subscription->UpdateInProcessYamlVersion = std::nullopt;
  866. subscription->UpdateInProcess = nullptr;
  867. }
  868. }
  869. void TConfigsDispatcher::Handle(TEvConsole::TEvGetNodeLabelsRequest::TPtr &ev) {
  870. auto Response = MakeHolder<TEvConsole::TEvGetNodeLabelsResponse>();
  871. for (const auto& [label, value] : Labels) {
  872. auto *labelSer = Response->Record.MutableResponse()->add_labels();
  873. labelSer->set_label(label);
  874. labelSer->set_value(value);
  875. }
  876. Send(ev->Sender, Response.Release());
  877. }
  878. IActor *CreateConfigsDispatcher(const TConfigsDispatcherInitInfo& initInfo) {
  879. return new TConfigsDispatcher(initInfo);
  880. }
  881. } // namespace NKikimr::NConsole