data_mart_executor.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. #!/usr/bin/env python3
  2. #--query_path .github/scripts/analytics/data_mart_queries/perfomance_olap_mart.sql --table_path perfomance/olap/fast_results --store_type column --partition_keys Run_start_timestamp --primary_keys Db Suite Test Branch Run_start_timestamp --ttl_min 43200 --ttl_key Run_start_timestamp
  3. import argparse
  4. import ydb
  5. import configparser
  6. import os
  7. import time
  8. # Load configuration
  9. dir = os.path.dirname(__file__)
  10. config = configparser.ConfigParser()
  11. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  12. config.read(config_file_path)
  13. repo_path = os.path.abspath(f"{dir}/../../../")
  14. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  15. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  16. def get_data_from_query_with_metadata(driver, query):
  17. results = []
  18. scan_query = ydb.ScanQuery(query, {})
  19. it = driver.table_client.scan_query(scan_query)
  20. print(f"Executing query")
  21. start_time = time.time()
  22. column_types = None
  23. while True:
  24. try:
  25. result = next(it)
  26. if column_types is None:
  27. column_types = [(col.name, col.type) for col in result.result_set.columns]
  28. results.extend(result.result_set.rows)
  29. except StopIteration:
  30. break
  31. end_time = time.time()
  32. print(f'Captured {len(results)} rows, duration: {end_time - start_time}s')
  33. return results, column_types
  34. def ydb_type_to_str(ydb_type, store_type = 'ROW'):
  35. # Converts YDB type to string representation for table creation
  36. is_optional = False
  37. if ydb_type.HasField('optional_type'):
  38. is_optional = True
  39. base_type = ydb_type.optional_type.item
  40. else:
  41. base_type = ydb_type
  42. for type in ydb.PrimitiveType:
  43. if type.proto.type_id == base_type.type_id:
  44. break
  45. if is_optional:
  46. result_type = ydb.OptionalType(type)
  47. name = result_type._repr
  48. else:
  49. result_type = type
  50. name = result_type.name
  51. if name.upper() == 'BOOL' and store_type.upper() == 'COLUMN':
  52. if is_optional:
  53. result_type = ydb.OptionalType(ydb.PrimitiveType.Uint8)
  54. else:
  55. result_type = ydb.PrimitiveType.Uint8
  56. name = 'Uint8'
  57. return result_type, name
  58. def create_table(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
  59. """Create table based on the structure of the provided column types."""
  60. if not column_types:
  61. raise ValueError("No column types to create table from.")
  62. columns_sql = []
  63. for column_name, column_ydb_type in column_types:
  64. column_type_obj, column_type_str = ydb_type_to_str(column_ydb_type, store_type.upper())
  65. if column_name in primary_keys:
  66. columns_sql.append(f"`{column_name}` {column_type_str.replace('?','')} NOT NULL")
  67. else:
  68. columns_sql.append(f"`{column_name}` {column_type_str.replace('?','')}")
  69. partition_keys_sql = ", ".join([f"`{key}`" for key in partition_keys])
  70. primary_keys_sql = ", ".join([f"`{key}`" for key in primary_keys])
  71. # Добавляем TTL только если оба аргумента заданы
  72. ttl_clause = ""
  73. if ttl_min and ttl_key:
  74. ttl_clause = f' TTL = Interval("PT{ttl_min}M") ON {ttl_key}'
  75. create_table_sql = f"""
  76. CREATE TABLE IF NOT EXISTS `{table_path}` (
  77. {', '.join(columns_sql)},
  78. PRIMARY KEY ({primary_keys_sql})
  79. )
  80. PARTITION BY HASH({partition_keys_sql})
  81. WITH (
  82. {"STORE = COLUMN" if store_type.upper() == 'COLUMN' else ''}
  83. {',' if store_type and ttl_clause else ''}
  84. {ttl_clause}
  85. )
  86. """
  87. print(f"Creating table with query: {create_table_sql}")
  88. session.execute_scheme(create_table_sql)
  89. def create_table_if_not_exists(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key):
  90. """Create table if it does not already exist, based on column types."""
  91. try:
  92. session.describe_table(table_path)
  93. print(f"Table '{table_path}' already exists.")
  94. except ydb.Error:
  95. print(f"Table '{table_path}' does not exist. Creating table...")
  96. create_table(session, table_path, column_types, store_type, partition_keys, primary_keys, ttl_min, ttl_key)
  97. def bulk_upsert(table_client, table_path, rows, column_types,store_type='ROW'):
  98. print(f"> Bulk upsert into: {table_path}")
  99. column_types_map = ydb.BulkUpsertColumns()
  100. for column_name, column_ydb_type in column_types:
  101. column_type_obj, column_type_str = ydb_type_to_str(column_ydb_type, store_type.upper())
  102. column_types_map.add_column(column_name, column_type_obj)
  103. table_client.bulk_upsert(table_path, rows, column_types_map)
  104. def parse_args():
  105. parser = argparse.ArgumentParser(description="YDB Table Manager")
  106. parser.add_argument("--table_path", required=True, help="Table path and name")
  107. parser.add_argument("--query_path", required=True, help="Path to the SQL query file")
  108. parser.add_argument("--store_type", choices=["column", "row"], required=True, help="Table store type (column or row)")
  109. parser.add_argument("--partition_keys", nargs="+", required=True, help="List of partition keys")
  110. parser.add_argument("--primary_keys", nargs="+", required=True, help="List of primary keys")
  111. parser.add_argument("--ttl_min", type=int, help="TTL in minutes")
  112. parser.add_argument("--ttl_key", help="TTL key column name")
  113. return parser.parse_args()
  114. def main():
  115. args = parse_args()
  116. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  117. print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
  118. return 1
  119. else:
  120. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  121. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  122. ]
  123. table_path = args.table_path
  124. batch_size = 50000
  125. # Read SQL query from file
  126. sql_query_path = os.path.join(repo_path, args.query_path)
  127. print(f'Query found: {sql_query_path}')
  128. with open(sql_query_path, 'r') as file:
  129. sql_query = file.read()
  130. with ydb.Driver(
  131. endpoint=DATABASE_ENDPOINT,
  132. database=DATABASE_PATH,
  133. credentials=ydb.credentials_from_env_variables()
  134. ) as driver:
  135. driver.wait(timeout=10, fail_fast=True)
  136. with ydb.SessionPool(driver) as pool:
  137. # Run query to get sample data and column types
  138. results, column_types = get_data_from_query_with_metadata(driver, sql_query)
  139. if not results:
  140. print("No data to create table from.")
  141. return
  142. # Create table if not exists based on sample column types
  143. pool.retry_operation_sync(
  144. lambda session: create_table_if_not_exists(
  145. session, f'{DATABASE_PATH}/{table_path}', column_types, args.store_type,
  146. args.partition_keys, args.primary_keys, args.ttl_min, args.ttl_key
  147. )
  148. )
  149. print(f'Preparing to upsert: {len(results)} rows')
  150. for start in range(0, len(results), batch_size):
  151. batch_rows = results[start:start + batch_size]
  152. print(f'Upserting: {start}-{start + len(batch_rows)}/{len(results)} rows')
  153. bulk_upsert(driver.table_client, f'{DATABASE_PATH}/{table_path}', batch_rows, column_types, args.store_type)
  154. print('Data uploaded')
  155. if __name__ == "__main__":
  156. main()