flaky_tests_history.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import traceback
  8. import time
  9. import ydb
  10. from collections import Counter
  11. dir = os.path.dirname(__file__)
  12. config = configparser.ConfigParser()
  13. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  14. config.read(config_file_path)
  15. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  16. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  17. def create_tables(pool, table_path):
  18. print(f"> create table if not exists:'{table_path}'")
  19. def callee(session):
  20. session.execute_scheme(f"""
  21. CREATE table IF NOT EXISTS `{table_path}` (
  22. `test_name` Utf8 NOT NULL,
  23. `suite_folder` Utf8 NOT NULL,
  24. `full_name` Utf8 NOT NULL,
  25. `date_window` Date NOT NULL,
  26. `days_ago_window` Uint64 NOT NULL,
  27. `history` String,
  28. `history_class` String,
  29. `pass_count` Uint64,
  30. `mute_count` Uint64,
  31. `fail_count` Uint64,
  32. `skip_count` Uint64,
  33. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window)
  34. )
  35. PARTITION BY HASH(`full_name`)
  36. WITH (STORE = COLUMN)
  37. """)
  38. return pool.retry_operation_sync(callee)
  39. def bulk_upsert(table_client, table_path, rows):
  40. print(f"> bulk upsert: {table_path}")
  41. column_types = (
  42. ydb.BulkUpsertColumns()
  43. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  44. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  45. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  46. .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
  47. .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  48. .add_column("history", ydb.OptionalType(ydb.PrimitiveType.String))
  49. .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.String))
  50. .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  51. .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  52. .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  53. .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  54. )
  55. table_client.bulk_upsert(table_path, rows, column_types)
  56. def main():
  57. parser = argparse.ArgumentParser()
  58. parser.add_argument('--days-window', default=5, type=int, help='how many days back we collecting history')
  59. args, unknown = parser.parse_known_args()
  60. history_for_n_day = args.days_window
  61. print(f'Getting hostory in window {history_for_n_day} days')
  62. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  63. print(
  64. "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
  65. )
  66. return 1
  67. else:
  68. # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
  69. # So, set up it locally
  70. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  71. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  72. ]
  73. with ydb.Driver(
  74. endpoint=DATABASE_ENDPOINT,
  75. database=DATABASE_PATH,
  76. credentials=ydb.credentials_from_env_variables(),
  77. ) as driver:
  78. driver.wait(timeout=10, fail_fast=True)
  79. session = ydb.retry_operation_sync(
  80. lambda: driver.table_client.session().create()
  81. )
  82. # settings, paths, consts
  83. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
  84. table_client = ydb.TableClient(driver, tc_settings)
  85. table_path = f'test_results/analytics/flaky_tests_history_{history_for_n_day}_days'
  86. default_start_date = datetime.date(2024, 7, 1)
  87. with ydb.SessionPool(driver) as pool:
  88. create_tables(pool, table_path)
  89. # geting last date from history
  90. last_date_query = f"select max(date_window) as max_date_window from `{table_path}`"
  91. query = ydb.ScanQuery(last_date_query, {})
  92. it = table_client.scan_query(query)
  93. results = []
  94. while True:
  95. try:
  96. result = next(it)
  97. results = results + result.result_set.rows
  98. except StopIteration:
  99. break
  100. if results[0] and results[0].get( 'max_date_window', default_start_date) is not None:
  101. last_date = results[0].get(
  102. 'max_date_window', default_start_date).strftime('%Y-%m-%d')
  103. else:
  104. last_date = default_start_date.strftime('%Y-%m-%d')
  105. print(f'last hisotry date: {last_date}')
  106. # getting history for dates >= last_date
  107. query_get_history = f"""
  108. select
  109. full_name,
  110. date_base,
  111. history_list,
  112. dist_hist,
  113. suite_folder,
  114. test_name
  115. from (
  116. select
  117. full_name,
  118. date_base,
  119. AGG_LIST(status) as history_list ,
  120. String::JoinFromList( AGG_LIST_DISTINCT(status) ,',') as dist_hist,
  121. suite_folder,
  122. test_name
  123. from (
  124. select * from (
  125. select * from (
  126. select
  127. DISTINCT suite_folder || '/' || test_name as full_name,
  128. suite_folder,
  129. test_name
  130. from `test_results/test_runs_column`
  131. where
  132. status in ('failure','mute')
  133. and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo','Postcommit_asan')
  134. and branch = 'main'
  135. and run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D")
  136. ) as tests_with_fails
  137. cross join (
  138. select
  139. DISTINCT DateTime::MakeDate(run_timestamp) as date_base
  140. from `test_results/test_runs_column`
  141. where
  142. status in ('failure','mute')
  143. and job_name in ('Nightly-run', 'Postcommit_relwithdebinfo','Postcommit_asan')
  144. and branch = 'main'
  145. and run_timestamp>= Date('{last_date}')
  146. ) as date_list
  147. ) as test_and_date
  148. left JOIN (
  149. select * from (
  150. select
  151. suite_folder || '/' || test_name as full_name,
  152. run_timestamp,
  153. status
  154. --ROW_NUMBER() OVER (PARTITION BY test_name ORDER BY run_timestamp DESC) AS rn
  155. from `test_results/test_runs_column`
  156. where
  157. run_timestamp >= Date('{last_date}') -{history_for_n_day}*Interval("P1D") and
  158. job_name in ('Nightly-run', 'Postcommit_relwithdebinfo')
  159. and build_type = 'relwithdebinfo'
  160. )
  161. ) as hist
  162. ON test_and_date.full_name=hist.full_name
  163. where
  164. hist.run_timestamp >= test_and_date.date_base -{history_for_n_day}*Interval("P1D") AND
  165. hist.run_timestamp <= test_and_date.date_base
  166. )
  167. GROUP BY full_name,suite_folder,test_name,date_base
  168. )
  169. """
  170. query = ydb.ScanQuery(query_get_history, {})
  171. # start transaction time
  172. start_time = time.time()
  173. it = driver.table_client.scan_query(query)
  174. # end transaction time
  175. results = []
  176. prepared_for_update_rows = []
  177. while True:
  178. try:
  179. result = next(it)
  180. results = results + result.result_set.rows
  181. except StopIteration:
  182. break
  183. end_time = time.time()
  184. print(f'transaction duration: {end_time - start_time}')
  185. print(f'history data captured, {len(results)} rows')
  186. for row in results:
  187. row['count'] = dict(zip(list(row['history_list']), [list(
  188. row['history_list']).count(i) for i in list(row['history_list'])]))
  189. prepared_for_update_rows.append({
  190. 'suite_folder': row['suite_folder'],
  191. 'test_name': row['test_name'],
  192. 'full_name': row['full_name'],
  193. 'date_window': row['date_base'],
  194. 'days_ago_window': history_for_n_day,
  195. 'history': ','.join(row['history_list']).encode('utf8'),
  196. 'history_class': row['dist_hist'],
  197. 'pass_count': row['count'].get('passed', 0),
  198. 'mute_count': row['count'].get('mute', 0),
  199. 'fail_count': row['count'].get('failure', 0),
  200. 'skip_count': row['count'].get('skipped', 0),
  201. })
  202. print('upserting history')
  203. with ydb.SessionPool(driver) as pool:
  204. create_tables(pool, table_path)
  205. full_path = posixpath.join(DATABASE_PATH, table_path)
  206. bulk_upsert(driver.table_client, full_path,
  207. prepared_for_update_rows)
  208. print('history updated')
  209. if __name__ == "__main__":
  210. main()