upload_testowners.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import traceback
  8. import time
  9. import ydb
  10. from collections import Counter
  11. dir = os.path.dirname(__file__)
  12. config = configparser.ConfigParser()
  13. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  14. config.read(config_file_path)
  15. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  16. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  17. def create_tables(pool, table_path):
  18. print(f"> create table if not exists:'{table_path}'")
  19. def callee(session):
  20. session.execute_scheme(f"""
  21. CREATE table IF NOT EXISTS `{table_path}` (
  22. `test_name` Utf8 NOT NULL,
  23. `suite_folder` Utf8 NOT NULL,
  24. `full_name` Utf8 NOT NULL,
  25. `run_timestamp_last` Timestamp NOT NULL,
  26. `owners` Utf8 ,
  27. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`)
  28. )
  29. PARTITION BY HASH(suite_folder,`full_name`)
  30. WITH (STORE = COLUMN)
  31. """)
  32. return pool.retry_operation_sync(callee)
  33. def bulk_upsert(table_client, table_path, rows):
  34. print(f"> bulk upsert: {table_path}")
  35. column_types = (
  36. ydb.BulkUpsertColumns()
  37. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  38. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  39. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  40. .add_column("run_timestamp_last", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
  41. .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  42. )
  43. table_client.bulk_upsert(table_path, rows, column_types)
  44. def main():
  45. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  46. print(
  47. "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
  48. )
  49. return 1
  50. else:
  51. # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
  52. # So, set up it locally
  53. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  54. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  55. ]
  56. with ydb.Driver(
  57. endpoint=DATABASE_ENDPOINT,
  58. database=DATABASE_PATH,
  59. credentials=ydb.credentials_from_env_variables(),
  60. ) as driver:
  61. driver.wait(timeout=10, fail_fast=True)
  62. session = ydb.retry_operation_sync(
  63. lambda: driver.table_client.session().create()
  64. )
  65. # settings, paths, consts
  66. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
  67. table_client = ydb.TableClient(driver, tc_settings)
  68. table_path = f'test_results/analytics/testowners'
  69. query_get_owners = f"""
  70. select
  71. DISTINCT test_name,
  72. suite_folder,
  73. suite_folder || '/' || test_name as full_name,
  74. FIRST_VALUE(owners) OVER w AS owners,
  75. FIRST_VALUE (run_timestamp) OVER w AS run_timestamp_last
  76. FROM
  77. `test_results/test_runs_column`
  78. WHERE
  79. run_timestamp >= CurrentUtcDate()- Interval("P10D")
  80. AND branch = 'main'
  81. and job_name in (
  82. 'Nightly-run', 'Postcommit_relwithdebinfo',
  83. 'Postcommit_asan'
  84. )
  85. WINDOW w AS (
  86. PARTITION BY test_name,
  87. suite_folder
  88. ORDER BY
  89. run_timestamp DESC
  90. )
  91. order by
  92. run_timestamp_last desc
  93. """
  94. query = ydb.ScanQuery(query_get_owners, {})
  95. # start transaction time
  96. start_time = time.time()
  97. it = driver.table_client.scan_query(query)
  98. # end transaction time
  99. results = []
  100. test_list = []
  101. while True:
  102. try:
  103. result = next(it)
  104. results = results + result.result_set.rows
  105. except StopIteration:
  106. break
  107. end_time = time.time()
  108. print(f'transaction duration: {end_time - start_time}')
  109. print(f'testowners data captured, {len(results)} rows')
  110. for row in results:
  111. test_list.append({
  112. 'suite_folder': row['suite_folder'],
  113. 'test_name': row['test_name'],
  114. 'full_name': row['full_name'],
  115. 'owners': row['owners'],
  116. 'run_timestamp_last': row['run_timestamp_last'],
  117. })
  118. print('upserting testowners')
  119. with ydb.SessionPool(driver) as pool:
  120. create_tables(pool, table_path)
  121. full_path = posixpath.join(DATABASE_PATH, table_path)
  122. bulk_upsert(driver.table_client, full_path,
  123. test_list)
  124. print('testowners updated')
  125. if __name__ == "__main__":
  126. main()