flaky_tests_history.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import traceback
  8. import time
  9. import ydb
  10. from collections import Counter
  11. dir = os.path.dirname(__file__)
  12. config = configparser.ConfigParser()
  13. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  14. config.read(config_file_path)
  15. build_preset = os.environ.get("build_preset")
  16. branch = os.environ.get("branch_to_compare")
  17. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  18. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  19. def create_tables(pool, table_path):
  20. print(f"> create table: {table_path}")
  21. def callee(session):
  22. session.execute_scheme(f"""
  23. CREATE table `{table_path}` (
  24. `test_name` Utf8 NOT NULL,
  25. `suite_folder` Utf8 NOT NULL,
  26. `full_name` Utf8 NOT NULL,
  27. `date_window` Date NOT NULL,
  28. `days_ago_window` Uint64 NOT NULL,
  29. `history` String,
  30. `history_class` String,
  31. `pass_count` Uint64,
  32. `mute_count` Uint64,
  33. `fail_count` Uint64,
  34. `skip_count` Uint64,
  35. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window)
  36. )
  37. PARTITION BY HASH(`full_name`)
  38. WITH (STORE = COLUMN)
  39. """)
  40. return pool.retry_operation_sync(callee)
  41. def bulk_upsert(table_client, table_path, rows):
  42. print(f"> bulk upsert: {table_path}")
  43. column_types = (
  44. ydb.BulkUpsertColumns()
  45. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  46. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  47. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  48. .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
  49. .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  50. .add_column("history", ydb.OptionalType(ydb.PrimitiveType.String))
  51. .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.String))
  52. .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  53. .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  54. .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  55. .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  56. )
  57. table_client.bulk_upsert(table_path, rows, column_types)
  58. def main():
  59. parser = argparse.ArgumentParser()
  60. parser.add_argument('--days-window', default=5, type=int, help='how many days back we collecting history')
  61. args, unknown = parser.parse_known_args()
  62. history_for_n_day = args.days_window
  63. print(f'Getting hostory in window {history_for_n_day} days')
  64. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  65. print(
  66. "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
  67. )
  68. return 1
  69. else:
  70. # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
  71. # So, set up it locally
  72. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  73. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  74. ]
  75. with ydb.Driver(
  76. endpoint=DATABASE_ENDPOINT,
  77. database=DATABASE_PATH,
  78. credentials=ydb.credentials_from_env_variables(),
  79. ) as driver:
  80. driver.wait(timeout=10, fail_fast=True)
  81. session = ydb.retry_operation_sync(
  82. lambda: driver.table_client.session().create()
  83. )
  84. # settings, paths, consts
  85. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
  86. table_client = ydb.TableClient(driver, tc_settings)
  87. table_path = f'test_results/analytics/flaky_tests_history_{history_for_n_day}_days'
  88. default_start_date = datetime.date(2024, 7, 1)
  89. with ydb.SessionPool(driver) as pool:
  90. create_tables(pool, table_path)
  91. # geting last date from history
  92. last_date_query = f"select max(date_window) as max_date_window from `{table_path}`"
  93. query = ydb.ScanQuery(last_date_query, {})
  94. it = table_client.scan_query(query)
  95. results = []
  96. while True:
  97. try:
  98. result = next(it)
  99. results = results + result.result_set.rows
  100. except StopIteration:
  101. break
  102. if results[0] and results[0].get( 'max_date_window', default_start_date) is not None:
  103. last_date = results[0].get(
  104. 'max_date_window', default_start_date).strftime('%Y-%m-%d')
  105. else:
  106. last_date = default_start_date.strftime('%Y-%m-%d')
  107. print(f'last hisotry date: {last_date}')
  108. # getting history for dates >= last_date
  109. query_get_history = f"""
  110. select
  111. full_name,
  112. date_base,
  113. history_list,
  114. dist_hist,
  115. suite_folder,
  116. test_name
  117. from (
  118. select
  119. full_name,
  120. date_base,
  121. AGG_LIST(status) as history_list ,
  122. String::JoinFromList( AGG_LIST_DISTINCT(status) ,',') as dist_hist,
  123. suite_folder,
  124. test_name
  125. from (
  126. select * from (
  127. select * from (
  128. select
  129. DISTINCT suite_folder || '/' || test_name as full_name,
  130. suite_folder,
  131. test_name
  132. from `test_results/test_runs_column`
  133. where
  134. status in ('failure','mute')
  135. and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
  136. and build_type = 'relwithdebinfo' and
  137. run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D")
  138. ) as tests_with_fails
  139. cross join (
  140. select
  141. DISTINCT DateTime::MakeDate(run_timestamp) as date_base
  142. from `test_results/test_runs_column`
  143. where
  144. status in ('failure','mute')
  145. and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
  146. and build_type = 'relwithdebinfo'
  147. and run_timestamp>= Date('{last_date}')
  148. ) as date_list
  149. ) as test_and_date
  150. left JOIN (
  151. select * from (
  152. select
  153. suite_folder || '/' || test_name as full_name,
  154. run_timestamp,
  155. status
  156. --ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn
  157. from `test_results/test_runs_column`
  158. where
  159. run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") and
  160. job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
  161. and build_type = 'relwithdebinfo'
  162. )
  163. ) as hist
  164. ON test_and_date.full_name=hist.full_name
  165. where
  166. hist.run_timestamp >= test_and_date.date_base -{history_for_n_day}*Interval("P1D") AND
  167. hist.run_timestamp <= test_and_date.date_base
  168. )
  169. GROUP BY full_name,suite_folder,test_name,date_base
  170. )
  171. """
  172. query = ydb.ScanQuery(query_get_history, {})
  173. # start transaction time
  174. start_time = time.time()
  175. it = driver.table_client.scan_query(query)
  176. # end transaction time
  177. results = []
  178. prepared_for_update_rows = []
  179. while True:
  180. try:
  181. result = next(it)
  182. results = results + result.result_set.rows
  183. except StopIteration:
  184. break
  185. end_time = time.time()
  186. print(f'transaction duration: {end_time - start_time}')
  187. print(f'history data captured, {len(results)} rows')
  188. for row in results:
  189. row['count'] = dict(zip(list(row['history_list']), [list(
  190. row['history_list']).count(i) for i in list(row['history_list'])]))
  191. prepared_for_update_rows.append({
  192. 'suite_folder': row['suite_folder'],
  193. 'test_name': row['test_name'],
  194. 'full_name': row['full_name'],
  195. 'date_window': row['date_base'],
  196. 'days_ago_window': history_for_n_day,
  197. 'history': ','.join(row['history_list']).encode('utf8'),
  198. 'history_class': row['dist_hist'],
  199. 'pass_count': row['count'].get('passed', 0),
  200. 'mute_count': row['count'].get('mute', 0),
  201. 'fail_count': row['count'].get('failure', 0),
  202. 'skip_count': row['count'].get('skipped', 0),
  203. })
  204. print('upserting history')
  205. with ydb.SessionPool(driver) as pool:
  206. create_tables(pool, table_path)
  207. full_path = posixpath.join(DATABASE_PATH, table_path)
  208. bulk_upsert(driver.table_client, full_path,
  209. prepared_for_update_rows)
  210. print('history updated')
  211. if __name__ == "__main__":
  212. main()