__init__.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. import bz2
  2. import getopt
  3. import glob
  4. import gzip
  5. import logging
  6. import os
  7. import re
  8. import requests
  9. import shutil
  10. import tempfile
  11. import subprocess as sp
  12. import tarfile
  13. import yatest.common as yat
  14. from library.python.testing.recipe import set_env
  15. from library.recipes.common import find_free_ports
  16. from library.recipes.common import start_daemon
  17. ZK_RECIPE_ENVIRONMENT_VAR = 'RECIPE_ZOOKEEPER_HOST'
  18. CLICKHOUSE_RESOURCE_PATH = 'clickhouse/resource.tar.gz'
  19. CLICKHOUSE_EXECUTABLE_PATH = 'contrib/clickhouse/programs/clickhouse'
  20. CLICKHOUSE_CONFIG_DEFAULT = 'library/recipes/clickhouse/recipe/config/config_default.xml'
  21. CLICKHOUSE_ZK_CONFIG_DEFAULT = 'library/recipes/clickhouse/recipe/config/config_with_zookeeper.xml'
  22. CLICKHOUSE_REGIONS_CONFIG_DEFAULT = 'library/recipes/clickhouse/recipe/config/config_with_regions.xml'
  23. CLICKHOUSE_DIR = '{}clickhouse'
  24. CLICKHOUSE_CONFIG_DIR = '{}clickhouse/config'
  25. CLICKHOUSE_REGIONS_FILES_DIR = '{}clickhouse/config/regions'
  26. CLICKHOUSE_REGIONS_FILE_HIERARCHY = '{}clickhouse/config/regions/regions_hierarchy.txt'
  27. CLICKHOUSE_TMP_DIR = '{}clickhouse/tmp'
  28. CLICKHOUSE_USER_FILES_DIR = '{}clickhouse/user_files'
  29. CLICKHOUSE_FORMAT_SCHEMA_DIR = '{}clickhouse/format_schema'
  30. CLICKHOUSE_SERVER_LOG = '{}clickhouse-server.log'
  31. CLICKHOUSE_SERVER_PID_FILE = '{}recipe.clickhouse.pid'
  32. TABLE_NAME_RE = re.compile(r'^\w+(\.\w+)?')
  33. PLACEHOLDER_RE = re.compile(r'\${(.*?)}')
  34. BLOCK_SIZE = 8 * 1024
  35. logger = logging.getLogger('clickhouse.recipe')
  36. def start(argv):
  37. opts = _parse_argv(argv)
  38. prefix = opts["prefix"] if "prefix" in opts else ""
  39. clickhouse = _find_clickhouse_executable(prefix)
  40. http_port, native_port, interserver_port = find_free_ports(3)
  41. environment = _update_environment(http_port, native_port, interserver_port, prefix, clickhouse)
  42. config = _prepare_clickhouse_config(opts)
  43. logger.info('Start ClickHouse. Http port: %d, native port: %d, config: %s', http_port, native_port, config)
  44. start_daemon(
  45. command=[clickhouse, 'server', '--config', config],
  46. environment=environment,
  47. is_alive_check=lambda: _is_alive(http_port),
  48. pid_file_name=yat.work_path(CLICKHOUSE_SERVER_PID_FILE.format(prefix)),
  49. )
  50. logger.info('ClickHouse started.')
  51. if 'execute' in opts:
  52. _execute_queries(clickhouse, native_port, opts['execute'], environment, 'expand-vars' in opts)
  53. if 'execute-file' in opts:
  54. _execute_queries_file(clickhouse, native_port, opts['execute-file'])
  55. if 'insert-csv' in opts:
  56. _execute_insert_csv(clickhouse, native_port, opts['insert-csv'])
  57. def stop(argv):
  58. opts = _parse_argv(argv)
  59. prefix = opts["prefix"] if "prefix" in opts else ""
  60. _terminate_clickhouse(prefix)
  61. def _error(message, *args):
  62. logger.error(message, args)
  63. raise RuntimeError(message % args)
  64. def _find_clickhouse_executable(prefix):
  65. clickhouse_resource = yat.work_path(CLICKHOUSE_RESOURCE_PATH)
  66. if os.path.isfile(clickhouse_resource):
  67. clickhouse_dir = prefix + os.path.dirname(clickhouse_resource)
  68. with tarfile.open(clickhouse_resource, 'r') as resource:
  69. resource.extractall(clickhouse_dir)
  70. clickhouse = os.path.join(clickhouse_dir, 'clickhouse')
  71. else:
  72. clickhouse = yat.binary_path(CLICKHOUSE_EXECUTABLE_PATH)
  73. if os.path.isfile(clickhouse):
  74. logger.info('ClickHouse executable: %s', clickhouse)
  75. return clickhouse
  76. _error('Can not find ClickHouse executable!')
  77. def _parse_argv(argv):
  78. opts, argv = getopt.getopt(
  79. argv, '', ['config=', 'execute=', 'execute-file=', 'insert-csv=', 'expand-vars', 'prefix=']
  80. )
  81. opt_dict = {}
  82. for opt, arg in opts:
  83. if opt == '--config':
  84. opt_dict['config'] = arg
  85. elif opt == '--execute':
  86. opt_dict.setdefault('execute', []).extend(_collect_files(opt, arg))
  87. elif opt == '--execute-file':
  88. opt_dict.setdefault('execute-file', []).extend(_collect_files(opt, arg))
  89. elif opt == '--insert-csv':
  90. opt_dict.setdefault('insert-csv', []).extend(_collect_files(opt, arg))
  91. elif opt == '--expand-vars':
  92. opt_dict['expand-vars'] = True
  93. elif opt == '--prefix':
  94. opt_dict['prefix'] = arg
  95. return opt_dict
  96. def _collect_files(opt, glob_path):
  97. files = glob.glob(yat.work_path(glob_path))
  98. if len(files) > 0:
  99. return files
  100. files = glob.glob(yat.source_path(glob_path))
  101. if len(files) > 0:
  102. return files
  103. _error('File(s) from %s \'%s\' option not found!', opt, glob_path)
  104. def _update_environment(http_port, native_port, interserver_port, prefix, clickhouse_bin):
  105. variables = {
  106. 'RECIPE_CLICKHOUSE_HOST': '127.0.0.1',
  107. 'RECIPE_CLICKHOUSE_HTTP_PORT': str(http_port),
  108. 'RECIPE_CLICKHOUSE_NATIVE_PORT': str(native_port),
  109. 'RECIPE_CLICKHOUSE_INTERSERVER_PORT': str(interserver_port),
  110. 'RECIPE_CLICKHOUSE_USER': 'default',
  111. 'RECIPE_CLICKHOUSE_PASSWORD': '',
  112. 'RECIPE_CLICKHOUSE_LOG': yat.output_path(CLICKHOUSE_SERVER_LOG.format(prefix)),
  113. "RECIPE_CLICKHOUSE_DIR": yat.work_path(CLICKHOUSE_DIR.format(prefix)),
  114. "RECIPE_CLICKHOUSE_BIN": clickhouse_bin,
  115. "RECIPE_CLICKHOUSE_TMP_DIR": yat.work_path(CLICKHOUSE_TMP_DIR.format(prefix)),
  116. "RECIPE_CLICKHOUSE_REGIONS_FILE_HIERARCHY": yat.work_path(CLICKHOUSE_REGIONS_FILE_HIERARCHY.format(prefix)),
  117. "RECIPE_CLICKHOUSE_REGIONS_FILES_DIR": yat.work_path(CLICKHOUSE_REGIONS_FILES_DIR.format(prefix)),
  118. "RECIPE_CLICKHOUSE_USER_FILES_DIR": yat.work_path(CLICKHOUSE_USER_FILES_DIR.format(prefix)),
  119. "RECIPE_CLICKHOUSE_FORMAT_SCHEMA_DIR": yat.work_path(CLICKHOUSE_FORMAT_SCHEMA_DIR.format(prefix)),
  120. }
  121. for variable in variables.items():
  122. (k, v) = variable
  123. k = prefix + k
  124. set_env(k, v)
  125. environment = os.environ.copy()
  126. environment.update(variables)
  127. return environment
  128. def _prepare_clickhouse_config(opts):
  129. config = yat.source_path(opts.get('config', _get_clickhouse_default_config()))
  130. config_dir = os.path.dirname(config)
  131. prefix = opts["prefix"] if "prefix" in opts else ""
  132. work_config_dir = yat.work_path(CLICKHOUSE_CONFIG_DIR.format(prefix))
  133. logger.info('Copy ClickHouse config files from: \'%s\' to: \'%s\'', config_dir, work_config_dir)
  134. shutil.copytree(config_dir, work_config_dir)
  135. logger.info('Config path: ' + config)
  136. return os.path.join(work_config_dir, os.path.basename(config))
  137. def _get_clickhouse_default_config():
  138. if _is_zk_enabled():
  139. return CLICKHOUSE_ZK_CONFIG_DEFAULT
  140. else:
  141. return CLICKHOUSE_CONFIG_DEFAULT
  142. def _is_zk_enabled():
  143. return ZK_RECIPE_ENVIRONMENT_VAR in os.environ
  144. def _is_alive(port):
  145. try:
  146. response = requests.get('http://localhost:{}'.format(port), timeout=1)
  147. response.raise_for_status()
  148. except Exception as err:
  149. logger.debug('ClickHouse port check result: ' + str(err))
  150. return False
  151. else:
  152. logger.info('ClickHouse is up with http port {}'.format(port))
  153. return True
  154. def _execute_queries(clickhouse, port, query_paths, environment, expand_vars):
  155. logger.info('Executing queries...')
  156. args = ['--multiline', '--multiquery']
  157. for query_path in query_paths:
  158. logger.info('Executing queries from: ' + query_path)
  159. if expand_vars:
  160. tmp = tempfile.NamedTemporaryFile(delete=False)
  161. logger.info('Expanding variables to tmp file: %', tmp.name)
  162. try:
  163. tmp_path = _expand_vars(query_path, tmp, environment)
  164. _execute_clickhouse_client(clickhouse, port, args, tmp_path)
  165. finally:
  166. tmp.close()
  167. os.unlink(tmp.name)
  168. else:
  169. _execute_clickhouse_client(clickhouse, port, args, query_path)
  170. logger.info('All query files processed.')
  171. def _execute_queries_file(clickhouse, port, queries_files):
  172. logger.info('Executing queries...')
  173. for queries_file in queries_files:
  174. logger.info('Executing queries from file: ' + queries_file)
  175. args = ['--queries-file', queries_file]
  176. _execute_clickhouse_client(clickhouse, port, args)
  177. logger.info('All query files processed.')
  178. def _expand_vars(source, tmp, environment):
  179. with open(source) as original:
  180. for line in original:
  181. tmp.write(re.sub(PLACEHOLDER_RE, lambda match: environment[match.group(1)], line))
  182. tmp.flush()
  183. return tmp.name
  184. def _execute_insert_csv(clickhouse, port, csv_paths):
  185. logger.info('Inserting data from CSV files...')
  186. for csv_path in csv_paths:
  187. csv_file = os.path.basename(csv_path)
  188. match = TABLE_NAME_RE.match(csv_file)
  189. table_name = match.group(0) if match else csv_file
  190. args = ['--query=INSERT INTO {} FORMAT CSV'.format(table_name)]
  191. logger.info('Insert data into \'%s\' from CSV file: %s', table_name, csv_path)
  192. _execute_clickhouse_client(clickhouse, port, args, csv_path)
  193. logger.info('All data are inserted.')
  194. def _execute_clickhouse_client(clickhouse, port, args, input_file=None):
  195. command = [clickhouse, 'client', '--port', str(port)]
  196. command.extend(args)
  197. if input_file:
  198. if input_file.endswith('.gz'):
  199. with gzip.open(input_file, 'r') as reader:
  200. _execute(command, reader=reader)
  201. elif input_file.endswith('.bz2'):
  202. with bz2.BZ2File(input_file, 'r') as reader:
  203. _execute(command, reader=reader)
  204. else:
  205. with open(input_file, 'r') as fin:
  206. _execute(command, stdin=fin)
  207. else:
  208. _execute(command)
  209. def _execute(command, stdin=None, reader=None):
  210. if stdin:
  211. yat.execute(command, stdin=stdin)
  212. elif reader:
  213. executor = yat.execute(command, stdin=sp.PIPE, wait=False)
  214. process_input = executor.process.stdin
  215. chunk = reader.read(BLOCK_SIZE)
  216. while chunk:
  217. process_input.write(chunk)
  218. chunk = reader.read(BLOCK_SIZE)
  219. process_input.close()
  220. executor.wait()
  221. else:
  222. yat.execute(command)
  223. def _terminate_clickhouse(prefix):
  224. logger.info('Terminating server...')
  225. try:
  226. with open(yat.work_path(CLICKHOUSE_SERVER_PID_FILE.format(prefix))) as fin:
  227. pid = fin.read()
  228. except IOError:
  229. logger.warn('Can not find server PID.')
  230. else:
  231. logger.info('Terminate ClickHouse server PID: %s', pid)
  232. os.kill(int(pid), 9)
  233. logger.info('Server terminated.')