test_history_fast.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #!/usr/bin/env python3
  2. import ydb
  3. import configparser
  4. import os
  5. # Load configuration
  6. dir = os.path.dirname(__file__)
  7. config = configparser.ConfigParser()
  8. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  9. config.read(config_file_path)
  10. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  11. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  12. def drop_table(session, table_path):
  13. print(f"> Dropping table if exists: '{table_path}'")
  14. session.execute_scheme(f"DROP TABLE IF EXISTS `{table_path}`;")
  15. def create_test_history_fast_table(session, table_path):
  16. print(f"> Creating table: '{table_path}'")
  17. session.execute_scheme(f"""
  18. CREATE TABLE `{table_path}` (
  19. `build_type` Utf8 NOT NULL,
  20. `job_name` Utf8 NOT NULL,
  21. `job_id` Uint64,
  22. `commit` Utf8,
  23. `branch` Utf8 NOT NULL,
  24. `pull` Utf8,
  25. `run_timestamp` Timestamp NOT NULL,
  26. `test_id` Utf8 NOT NULL,
  27. `suite_folder` Utf8,
  28. `test_name` Utf8,
  29. `full_name` Utf8 NOT NULL,
  30. `duration` Double,
  31. `status` Utf8,
  32. `status_description` Utf8,
  33. `owners` Utf8,
  34. PRIMARY KEY (`full_name`, `run_timestamp`, `job_name`, `branch`, `build_type`, test_id)
  35. )
  36. PARTITION BY HASH(run_timestamp)
  37. WITH (
  38. STORE = COLUMN,
  39. TTL = Interval("P7D") ON run_timestamp
  40. )
  41. """)
  42. def bulk_upsert(table_client, table_path, rows):
  43. print(f"> Bulk upsert into: {table_path}")
  44. column_types = (
  45. ydb.BulkUpsertColumns()
  46. .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  47. .add_column("job_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  48. .add_column("job_id", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  49. .add_column("commit", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  50. .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  51. .add_column("pull", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  52. .add_column("run_timestamp", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
  53. .add_column("test_id", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  54. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  55. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  56. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  57. .add_column("duration", ydb.OptionalType(ydb.PrimitiveType.Double))
  58. .add_column("status", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  59. .add_column("status_description", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  60. .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  61. )
  62. table_client.bulk_upsert(table_path, rows, column_types)
  63. def get_missed_data_for_upload(driver):
  64. results = []
  65. query = f"""
  66. SELECT
  67. build_type,
  68. job_name,
  69. job_id,
  70. commit,
  71. branch,
  72. pull,
  73. all_data.run_timestamp as run_timestamp,
  74. test_id,
  75. suite_folder,
  76. test_name,
  77. cast(suite_folder || '/' || test_name as UTF8) as full_name,
  78. duration,
  79. status,
  80. status_description,
  81. owners
  82. FROM `test_results/test_runs_column` as all_data
  83. LEFT JOIN (
  84. select distinct run_timestamp from `test_results/analytics/test_history_fast`
  85. ) as fast_data_missed
  86. ON all_data.run_timestamp = fast_data_missed.run_timestamp
  87. WHERE
  88. all_data.run_timestamp >= CurrentUtcDate() - 6*Interval("P1D") AND
  89. fast_data_missed.run_timestamp is NULL
  90. """
  91. scan_query = ydb.ScanQuery(query, {})
  92. it = driver.table_client.scan_query(scan_query)
  93. print(f'missed data capturing')
  94. while True:
  95. try:
  96. result = next(it)
  97. results.extend(result.result_set.rows)
  98. except StopIteration:
  99. break
  100. return results
  101. def main():
  102. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  103. print(
  104. "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
  105. )
  106. return 1
  107. else:
  108. # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
  109. # So, set up it locally
  110. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  111. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  112. ]
  113. table_path = "test_results/analytics/test_history_fast"
  114. batch_size = 50000
  115. with ydb.Driver(
  116. endpoint=DATABASE_ENDPOINT,
  117. database=DATABASE_PATH,
  118. credentials=ydb.credentials_from_env_variables()
  119. ) as driver:
  120. driver.wait(timeout=10, fail_fast=True)
  121. with ydb.SessionPool(driver) as pool:
  122. prepared_for_upload_rows = get_missed_data_for_upload(driver)
  123. print(f'Preparing to upsert: {len(prepared_for_upload_rows)} rows')
  124. if prepared_for_upload_rows:
  125. for start in range(0, len(prepared_for_upload_rows), batch_size):
  126. batch_rows_for_upload = prepared_for_upload_rows[start:start + batch_size]
  127. print(f'upserting: {start}-{start + len(batch_rows_for_upload)}/{len(prepared_for_upload_rows)} rows')
  128. bulk_upsert(driver.table_client, f'{DATABASE_PATH}/{table_path}', batch_rows_for_upload)
  129. print('Tests uploaded')
  130. else:
  131. print('Nothing to upload')
  132. if __name__ == "__main__":
  133. main()