kikimr_runner.py 22 KB


  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import os
  4. import shutil
  5. import tempfile
  6. import time
  7. import itertools
  8. from google.protobuf import text_format
  9. import ydb.tests.library.common.yatest_common as yatest_common
  10. from ydb.tests.library.common.wait_for import wait_for
  11. from . import daemon
  12. from . import param_constants
  13. from . import kikimr_config
  14. from . import kikimr_node_interface
  15. from . import kikimr_cluster_interface
  16. import ydb.core.protos.blobstorage_config_pb2 as bs
  17. from ydb.tests.library.predicates.blobstorage import blobstorage_controller_has_started_on_some_node
  18. from library.python import resource
  19. logger = logging.getLogger(__name__)
  20. def get_unique_path_for_current_test(output_path, sub_folder):
  21. test_name = yatest_common.context.test_name or ""
  22. test_name = test_name.replace(':', '_')
  23. return os.path.join(output_path, test_name, sub_folder)
  24. def ensure_path_exists(path):
  25. if not os.path.isdir(path):
  26. os.makedirs(path)
  27. return path
  28. def join(a, b):
  29. if a is None:
  30. a = ''
  31. if b is None:
  32. b = ''
  33. return os.path.join(a, b)
  34. class KiKiMRNode(daemon.Daemon, kikimr_node_interface.NodeInterface):
  35. def __init__(self, node_idx, config_path, port_allocator, cluster_name, configurator,
  36. udfs_dir=None, role='node', node_broker_port=None, tenant_affiliation=None, encryption_key=None):
  37. super(kikimr_node_interface.NodeInterface, self).__init__()
  38. self.node_id = node_idx
  39. self.__cwd = None
  40. self.__config_path = config_path
  41. self.__cluster_name = cluster_name
  42. self.__configurator = configurator
  43. self.__common_udfs_dir = udfs_dir
  44. self.__encryption_key = encryption_key
  45. self._tenant_affiliation = tenant_affiliation if tenant_affiliation is not None else 'dynamic'
  46. self.grpc_port = port_allocator.grpc_port
  47. self.mon_port = port_allocator.mon_port
  48. self.ic_port = port_allocator.ic_port
  49. self.grpc_ssl_port = port_allocator.grpc_ssl_port
  50. self.sqs_port = None
  51. if configurator.sqs_service_enabled:
  52. self.sqs_port = port_allocator.sqs_port
  53. self.__role = role
  54. self.__node_broker_port = node_broker_port
  55. self.__log_file = tempfile.NamedTemporaryFile(dir=self.cwd, prefix="logfile_", suffix=".log", delete=False)
  56. self.__cms_config_cache_file = tempfile.NamedTemporaryFile(
  57. dir=self.cwd,
  58. prefix="cms_config_cache_",
  59. delete=False
  60. )
  61. self.__cms_config_cache_file_name = self.__cms_config_cache_file.name
  62. daemon.Daemon.__init__(self, self.command, cwd=self.cwd, timeout=180, stderr_on_error_lines=240)
  63. @property
  64. def cwd(self):
  65. if self.__cwd is None:
  66. self.__cwd = ensure_path_exists(
  67. get_unique_path_for_current_test(
  68. self.__configurator.output_path,
  69. join(
  70. self.__cluster_name, "{}_{}".format(
  71. self.__role,
  72. self.node_id
  73. )
  74. )
  75. )
  76. )
  77. return self.__cwd
  78. @property
  79. def cms_config_cache_file_name(self):
  80. return self.__cms_config_cache_file_name
  81. @property
  82. def command(self):
  83. return self.__make_run_command()
  84. def format_pdisk(self, pdisk_path, disk_size, **kwargs):
  85. logger.debug("Formatting pdisk %s on node %s, disk_size %s" % (pdisk_path, self, disk_size))
  86. if pdisk_path.startswith('SectorMap'):
  87. return
  88. with open(pdisk_path, "wb") as out:
  89. out.seek(disk_size - 1)
  90. out.write(b'\0')
  91. def __make_run_command(self):
  92. command = [self.__configurator.binary_path, "server"]
  93. if self.__common_udfs_dir is not None:
  94. command.append("--udfs-dir={}".format(self.__common_udfs_dir))
  95. if self.__configurator.suppress_version_check:
  96. command.append("--suppress-version-check")
  97. if self.__node_broker_port is not None:
  98. command.append("--node-broker=%s%s:%d" % (
  99. "grpcs://" if self.__configurator.grpc_ssl_enable else "",
  100. self.host,
  101. self.__node_broker_port))
  102. else:
  103. command.append("--node=%d" % self.node_id)
  104. if self.__configurator.grpc_ssl_enable:
  105. command.append(
  106. "--ca=%s" % self.__configurator.grpc_tls_ca_path
  107. )
  108. if self.__role == 'slot':
  109. command.append(
  110. "--tenant=%s" % self._tenant_affiliation
  111. )
  112. if self.__configurator.grpc_ssl_enable:
  113. command.append(
  114. "--grpcs-port={}".format(
  115. self.grpc_ssl_port
  116. )
  117. )
  118. command.extend(
  119. [
  120. "--yaml-config=%s" % join(self.__config_path, "config.yaml"),
  121. "--log-file-name=%s" % self.__log_file.name,
  122. "--grpc-port=%s" % self.grpc_port,
  123. "--mon-port=%d" % self.mon_port,
  124. "--ic-port=%d" % self.ic_port,
  125. "--cms-config-cache-file=%s" % self.cms_config_cache_file_name,
  126. ]
  127. )
  128. if self.__encryption_key is not None:
  129. command.extend(["--key-file", self.__encryption_key])
  130. if self.sqs_port is not None:
  131. command.extend(["--sqs-port=%d" % self.sqs_port])
  132. logger.info('CFG_DIR_PATH="%s"', self.__config_path)
  133. logger.info("Final command: %s", ' '.join(command).replace(self.__config_path, '$CFG_DIR_PATH'))
  134. return command
  135. def stop(self):
  136. try:
  137. super(KiKiMRNode, self).stop()
  138. finally:
  139. logger.info("Stopped node %s", self)
  140. def kill(self):
  141. try:
  142. super(KiKiMRNode, self).kill()
  143. self.start()
  144. finally:
  145. logger.info("Killed node %s", self)
  146. def send_signal(self, signal):
  147. self.daemon.process.send_signal(signal)
  148. @property
  149. def host(self):
  150. return 'localhost'
  151. @property
  152. def hostname(self):
  153. return kikimr_config.get_fqdn()
  154. @property
  155. def port(self):
  156. return self.grpc_port
  157. @property
  158. def pid(self):
  159. return self.daemon.process.pid
  160. def start(self):
  161. try:
  162. super(KiKiMRNode, self).start()
  163. finally:
  164. logger.info("Started node %s", self)
  165. class KiKiMR(kikimr_cluster_interface.KiKiMRClusterInterface):
  166. def __init__(self, configurator=None, cluster_name=''):
  167. super(KiKiMR, self).__init__()
  168. self.__tmpdir = tempfile.mkdtemp(prefix="kikimr_" + cluster_name + "_")
  169. self.__common_udfs_dir = None
  170. self.__cluster_name = cluster_name
  171. self.__configurator = kikimr_config.KikimrConfigGenerator() if configurator is None else configurator
  172. self.__port_allocator = self.__configurator.port_allocator
  173. self._nodes = {}
  174. self._slots = {}
  175. self.__server = 'localhost'
  176. self.__client = None
  177. self.__storage_pool_id_allocator = itertools.count(1)
  178. self.__config_path = None
  179. self._slot_index_allocator = itertools.count(1)
  180. self._node_index_allocator = itertools.count(1)
  181. self.default_channel_bindings = None
  182. @property
  183. def config(self):
  184. return self.__configurator
  185. @property
  186. def nodes(self):
  187. return self._nodes
  188. @property
  189. def slots(self):
  190. return self._slots
  191. @property
  192. def domain_name(self):
  193. return self.__configurator.domain_name
  194. @property
  195. def server(self):
  196. return self.__server
  197. def __call_kikimr_new_cli(self, cmd, connect_to_server=True):
  198. server = 'grpc://{server}:{port}'.format(server=self.server, port=self.nodes[1].port)
  199. full_command = [self.__configurator.binary_path]
  200. if connect_to_server:
  201. full_command += ["--server={server}".format(server=server)]
  202. full_command += cmd
  203. logger.debug("Executing command = {}".format(full_command))
  204. try:
  205. return yatest_common.execute(full_command)
  206. except yatest_common.ExecutionError as e:
  207. logger.exception("KiKiMR command '{cmd}' failed with error: {e}\n\tstdout: {out}\n\tstderr: {err}".format(
  208. cmd=" ".join(str(x) for x in full_command),
  209. e=str(e),
  210. out=e.execution_result.std_out,
  211. err=e.execution_result.std_err
  212. ))
  213. raise
  214. def start(self):
  215. """
  216. Safely starts kikimr instance.
  217. Do not override this method.
  218. """
  219. try:
  220. logger.debug("Working directory: " + self.__tmpdir)
  221. self.__run()
  222. return self
  223. except Exception:
  224. logger.exception("KiKiMR start failed")
  225. self.stop()
  226. raise
  227. def __run(self):
  228. self.__client = None
  229. self.__instantiate_udfs_dir()
  230. self.__write_configs()
  231. for _ in self.__configurator.all_node_ids():
  232. self.__register_node()
  233. for node_id in self.__configurator.all_node_ids():
  234. self.__run_node(node_id)
  235. self.__wait_for_bs_controller_to_start()
  236. self.__add_bs_box()
  237. pools = {}
  238. for p in self.__configurator.dynamic_storage_pools:
  239. self.add_storage_pool(
  240. name=p['name'],
  241. kind=p['kind'],
  242. pdisk_user_kind=p['pdisk_user_kind'],
  243. )
  244. pools[p['name']] = p['kind']
  245. self.client.bind_storage_pools(self.domain_name, pools)
  246. default_pool_name = list(pools.keys())[0]
  247. self.default_channel_bindings = {idx: default_pool_name for idx in range(3)}
  248. logger.info("Cluster started and initialized")
  249. self.client.add_config_item(
  250. resource.find(
  251. "harness/resources/default_profile.txt"
  252. )
  253. )
  254. def __run_node(self, node_id):
  255. """
  256. :returns started KiKiMRNode instance
  257. Can be overriden.
  258. """
  259. self.__format_disks(node_id)
  260. self._nodes[node_id].start()
  261. return self._nodes[node_id]
  262. def __register_node(self):
  263. node_index = next(self._node_index_allocator)
  264. self._nodes[node_index] = KiKiMRNode(
  265. node_index,
  266. self.config_path,
  267. port_allocator=self.__port_allocator.get_node_port_allocator(node_index),
  268. cluster_name=self.__cluster_name,
  269. configurator=self.__configurator,
  270. udfs_dir=self.__common_udfs_dir,
  271. )
  272. return self._nodes[node_index]
  273. def register_slots(self, database, count=1, encryption_key=None):
  274. return [self.register_slot(database, encryption_key) for _ in range(count)]
  275. def register_and_start_slots(self, database, count=1, encryption_key=None):
  276. slots = self.register_slots(database, count, encryption_key)
  277. for slot in slots:
  278. slot.start()
  279. return slots
  280. def register_slot(self, tenant_affiliation=None, encryption_key=None):
  281. return self._register_slot(tenant_affiliation, encryption_key)
  282. def _register_slot(self, tenant_affiliation=None, encryption_key=None):
  283. slot_index = next(self._slot_index_allocator)
  284. node_broker_port = (
  285. self.nodes[1].grpc_ssl_port if self.__configurator.grpc_ssl_enable
  286. else self.nodes[1].grpc_port
  287. )
  288. self._slots[slot_index] = KiKiMRNode(
  289. slot_index,
  290. self.config_path,
  291. port_allocator=self.__port_allocator.get_slot_port_allocator(slot_index),
  292. cluster_name=self.__cluster_name,
  293. configurator=self.__configurator,
  294. udfs_dir=self.__common_udfs_dir,
  295. role='slot',
  296. node_broker_port=node_broker_port,
  297. tenant_affiliation=tenant_affiliation,
  298. encryption_key=encryption_key,
  299. )
  300. return self._slots[slot_index]
  301. def __stop_node(self, node):
  302. ret = None
  303. try:
  304. node.stop()
  305. except daemon.DaemonError as exceptions:
  306. ret = exceptions
  307. else:
  308. if self.__tmpdir is not None:
  309. shutil.rmtree(self.__tmpdir, ignore_errors=True)
  310. if self.__common_udfs_dir is not None:
  311. shutil.rmtree(self.__common_udfs_dir, ignore_errors=True)
  312. return ret
  313. def stop(self):
  314. saved_exceptions = []
  315. for slot in self.slots.values():
  316. exception = self.__stop_node(slot)
  317. if exception is not None:
  318. saved_exceptions.append(exception)
  319. for node in self.nodes.values():
  320. exception = self.__stop_node(node)
  321. if exception is not None:
  322. saved_exceptions.append(exception)
  323. self.__port_allocator.release_ports()
  324. if saved_exceptions:
  325. raise daemon.SeveralDaemonErrors(saved_exceptions)
  326. @property
  327. def config_path(self):
  328. if self.__config_path is None:
  329. self.__config_path = ensure_path_exists(
  330. get_unique_path_for_current_test(
  331. self.__configurator.output_path,
  332. join(
  333. self.__cluster_name, "kikimr_configs"
  334. )
  335. )
  336. )
  337. return self.__config_path
  338. def __write_configs(self):
  339. self.__configurator.write_proto_configs(self.config_path)
  340. def __instantiate_udfs_dir(self):
  341. to_load = self.__configurator.get_yql_udfs_to_load()
  342. if len(to_load) == 0:
  343. return
  344. self.__common_udfs_dir = tempfile.mkdtemp(prefix="common_udfs")
  345. for udf_path in to_load:
  346. link_name = os.path.join(self.__common_udfs_dir, os.path.basename(udf_path))
  347. os.symlink(udf_path, link_name)
  348. return self.__common_udfs_dir
  349. def __format_disks(self, node_id):
  350. for pdisk in self.__configurator.pdisks_info:
  351. if pdisk['node_id'] != node_id:
  352. continue
  353. self.nodes[node_id].format_pdisk(**pdisk)
  354. def __add_bs_box(self):
  355. request = bs.TConfigRequest()
  356. for node_id in self.__configurator.all_node_ids():
  357. cmd = request.Command.add()
  358. cmd.DefineHostConfig.HostConfigId = node_id
  359. for drive in self.__configurator.pdisks_info:
  360. if drive['node_id'] != node_id:
  361. continue
  362. drive_proto = cmd.DefineHostConfig.Drive.add()
  363. drive_proto.Path = drive['pdisk_path']
  364. drive_proto.Kind = drive['pdisk_user_kind']
  365. drive_proto.Type = drive.get('pdisk_type', 0)
  366. cmd = request.Command.add()
  367. cmd.DefineBox.BoxId = 1
  368. for node_id, node in self.nodes.items():
  369. host = cmd.DefineBox.Host.add()
  370. host.Key.Fqdn = node.host
  371. host.Key.IcPort = node.ic_port
  372. host.HostConfigId = node_id
  373. self._bs_config_invoke(request)
  374. def _bs_config_invoke(self, request):
  375. timeout = yatest_common.plain_or_under_sanitizer(120, 240)
  376. sleep = 5
  377. retries, success = timeout / sleep, False
  378. while retries > 0 and not success:
  379. try:
  380. self.__call_kikimr_new_cli(
  381. [
  382. "admin",
  383. "blobstorage",
  384. "config",
  385. "invoke",
  386. "--proto=%s" % text_format.MessageToString(request)
  387. ]
  388. )
  389. success = True
  390. except Exception as e:
  391. logger.error("Failed to execute, %s", str(e))
  392. retries -= 1
  393. time.sleep(sleep)
  394. if retries == 0:
  395. raise
  396. def add_storage_pool(self, name=None, kind="rot", pdisk_user_kind=0, erasure=None):
  397. if erasure is None:
  398. erasure = self.__configurator.static_erasure
  399. request = bs.TConfigRequest()
  400. cmd = request.Command.add()
  401. cmd.DefineStoragePool.BoxId = 1
  402. pool_id = cmd.DefineStoragePool.StoragePoolId = next(self.__storage_pool_id_allocator)
  403. if name is None:
  404. name = "dynamic_storage_pool:%s" % pool_id
  405. cmd.DefineStoragePool.StoragePoolId = pool_id
  406. cmd.DefineStoragePool.Name = name
  407. cmd.DefineStoragePool.Kind = kind
  408. cmd.DefineStoragePool.ErasureSpecies = str(erasure)
  409. cmd.DefineStoragePool.VDiskKind = "Default"
  410. cmd.DefineStoragePool.NumGroups = 2
  411. pdisk_filter = cmd.DefineStoragePool.PDiskFilter.add()
  412. pdisk_filter.Property.add().Type = 0
  413. pdisk_filter.Property.add().Kind = pdisk_user_kind
  414. self._bs_config_invoke(request)
  415. return name
  416. def __wait_for_bs_controller_to_start(self):
  417. monitors = [node.monitor for node in self.nodes.values()]
  418. def predicate():
  419. return blobstorage_controller_has_started_on_some_node(monitors)
  420. timeout_seconds = yatest_common.plain_or_under_sanitizer(120, 240)
  421. bs_controller_started = wait_for(
  422. predicate=predicate, timeout_seconds=timeout_seconds, step_seconds=1.0, multiply=1.3
  423. )
  424. assert bs_controller_started
  425. class KikimrExternalNode(daemon.ExternalNodeDaemon, kikimr_node_interface.NodeInterface):
  426. def __init__(
  427. self, node_id, host, port, mon_port, ic_port, mbus_port, configurator=None, slot_id=None):
  428. super(KikimrExternalNode, self).__init__(host)
  429. self.__node_id = node_id
  430. self.__host = host
  431. self.__port = port
  432. self.__grpc_port = port
  433. self.__mon_port = mon_port
  434. self.__ic_port = ic_port
  435. self.__configurator = configurator
  436. self.__mbus_port = mbus_port
  437. self.logger = logger.getChild(self.__class__.__name__)
  438. if slot_id is not None:
  439. self.__slot_id = "%s" % str(self.__ic_port)
  440. else:
  441. self.__slot_id = None
  442. self._can_update = None
  443. self.current_version_idx = 0
  444. self.versions = [
  445. param_constants.kikimr_last_version_deploy_path,
  446. param_constants.kikimr_next_version_deploy_path,
  447. ]
  448. @property
  449. def can_update(self):
  450. if self._can_update is None:
  451. choices = self.ssh_command('ls %s*' % param_constants.kikimr_binary_deploy_path, raise_on_error=True)
  452. choices = choices.split()
  453. self.logger.error("Current available choices are: %s" % choices)
  454. self._can_update = True
  455. for version in self.versions:
  456. if version not in choices:
  457. self._can_update &= False
  458. return self._can_update
  459. def start(self):
  460. if self.__slot_id is None:
  461. return self.ssh_command("sudo start kikimr")
  462. return self.ssh_command(
  463. [
  464. "sudo", "start",
  465. "kikimr-multi",
  466. "slot={}".format(self.__slot_id),
  467. "tenant=dynamic",
  468. "mbus={}".format(self.__mbus_port),
  469. "grpc={}".format(self.__grpc_port),
  470. "mon={}".format(self.__mon_port),
  471. "ic={}".format(self.__ic_port),
  472. ]
  473. )
  474. def stop(self):
  475. if self.__slot_id is None:
  476. return self.ssh_command("sudo stop kikimr")
  477. return self.ssh_command(
  478. [
  479. "sudo", "stop",
  480. "kikimr-multi",
  481. "slot={}".format(self.__slot_id),
  482. "tenant=dynamic",
  483. "mbus={}".format(self.__mbus_port),
  484. "grpc={}".format(self.__grpc_port),
  485. "mon={}".format(self.__mon_port),
  486. "ic={}".format(self.__ic_port),
  487. ]
  488. )
  489. @property
  490. def cwd(self):
  491. assert False, "not supported"
  492. @property
  493. def mon_port(self):
  494. return self.__mon_port
  495. @property
  496. def pid(self):
  497. return None
  498. def is_alive(self):
  499. # TODO implement check
  500. return True
  501. @property
  502. def host(self):
  503. return self.__host
  504. @property
  505. def port(self):
  506. return self.__port
  507. @property
  508. def grpc_ssl_port(self):
  509. # TODO(gvit): support in clusters
  510. return None
  511. @property
  512. def grpc_port(self):
  513. return self.__port
  514. @property
  515. def mbus_port(self):
  516. return self.__mbus_port
  517. @property
  518. def ic_port(self):
  519. return self.__ic_port
  520. @property
  521. def node_id(self):
  522. return self.__node_id
  523. @property
  524. def logs_directory(self):
  525. folder = 'kikimr_%s' % self.__slot_id if self.__slot_id else 'kikimr'
  526. return "/Berkanavt/{}/logs".format(folder)
  527. def update_binary_links(self):
  528. self.ssh_command("sudo rm -rf %s" % param_constants.kikimr_binary_deploy_path)
  529. self.ssh_command(
  530. "sudo cp -l %s %s" % (
  531. self.versions[self.current_version_idx],
  532. param_constants.kikimr_binary_deploy_path,
  533. )
  534. )
  535. def switch_version(self):
  536. if not self.can_update:
  537. self.logger.info("Next version is not available. Cannot change versions.")
  538. return None
  539. self.current_version_idx ^= 1
  540. self.update_binary_links()
  541. def prepare_artifacts(self, cluster_yml):
  542. self.copy_file_or_dir(
  543. param_constants.kikimr_configure_binary_path(), param_constants.kikimr_configure_binary_deploy_path)
  544. local_drivers_path = (param_constants.kikimr_driver_path(), param_constants.next_version_kikimr_driver_path())
  545. for version, local_driver in zip(self.versions, local_drivers_path):
  546. self.ssh_command("sudo rm -rf %s" % version)
  547. if local_driver is not None:
  548. self.copy_file_or_dir(
  549. local_driver, version)
  550. self.ssh_command("sudo /sbin/setcap 'CAP_SYS_RAWIO,CAP_SYS_NICE=ep' %s" % version)
  551. self.update_binary_links()
  552. self.ssh_command("sudo mkdir -p %s" % param_constants.kikimr_configuration_deploy_path)
  553. self.copy_file_or_dir(cluster_yml, param_constants.kikimr_cluster_yaml_deploy_path)
  554. self.ssh_command(param_constants.generate_configs_cmd())
  555. self.ssh_command(
  556. param_constants.generate_configs_cmd(
  557. "--dynamic"
  558. )
  559. )
  560. def format_pdisk(self, pdisk_id):
  561. pass
  562. def cleanup_disk(self, path):
  563. self.ssh_command(
  564. 'sudo dd if=/dev/zero of={} bs=1M count=1 status=none;'.format(path),
  565. raise_on_error=True)
  566. def cleanup_disks(self):
  567. self.ssh_command(
  568. "for X in /dev/disk/by-partlabel/kikimr_*; "
  569. "do sudo dd if=/dev/zero of=$X bs=1M count=1 status=none; done",
  570. raise_on_error=True)