flaky_tests_history_n_runs.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import traceback
  8. import time
  9. import ydb
  10. dir = os.path.dirname(__file__)
  11. config = configparser.ConfigParser()
  12. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  13. config.read(config_file_path)
  14. build_preset = os.environ.get("build_preset")
  15. branch = os.environ.get("branch_to_compare")
  16. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  17. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  18. def create_tables(pool, table_path):
  19. print(f"> create table: {table_path}")
  20. def callee(session):
  21. session.execute_scheme(f"""
  22. CREATE table `{table_path}` (
  23. `test_name` Utf8 NOT NULL,
  24. `suite_folder` Utf8 NOT NULL,
  25. `full_name` Utf8 NOT NULL,
  26. `date_window` Date NOT NULL,
  27. `build_type` Utf8 NOT NULL,
  28. `branch` Utf8 NOT NULL,
  29. `runs_window` Uint64 NOT NULL,
  30. `first_run` Timestamp,
  31. `last_run` Timestamp ,
  32. `owners` Utf8 NOT NULL,
  33. `history` String,
  34. `history_class` String,
  35. `pass_count` Uint64,
  36. `mute_count` Uint64,
  37. `fail_count` Uint64,
  38. `skip_count` Uint64,
  39. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window,runs_window,build_type,branch)
  40. )
  41. PARTITION BY HASH(`full_name`,build_type,branch)
  42. WITH (STORE = COLUMN)
  43. """)
  44. return pool.retry_operation_sync(callee)
  45. def bulk_upsert(table_client, table_path, rows):
  46. print(f"> bulk upsert: {table_path}")
  47. column_types = (
  48. ydb.BulkUpsertColumns()
  49. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  50. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  51. .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  52. .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  53. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  54. .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
  55. .add_column("first_run", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
  56. .add_column("last_run", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
  57. .add_column("owners", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  58. .add_column("runs_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  59. .add_column("history", ydb.OptionalType(ydb.PrimitiveType.String))
  60. .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.String))
  61. .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  62. .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  63. .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  64. .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  65. )
  66. table_client.bulk_upsert(table_path, rows, column_types)
  67. def main():
  68. parser = argparse.ArgumentParser()
  69. parser.add_argument('--runs', default=10,choices=[10, 25, 50], type=int, help='how many runs back we collecting history')
  70. parser.add_argument('--build_type',choices=['relwithdebinfo', 'release-asan'], default='relwithdebinfo', type=str, help='build : relwithdebinfo or release-asan')
  71. parser.add_argument('--branch', default='main',choices=['main'], type=str, help='branch')
  72. args, unknown = parser.parse_known_args()
  73. history_for_n_runs = args.runs
  74. build_type = args.build_type
  75. branch = args.branch
  76. print(f'Getting hostory in window {history_for_n_runs} runs')
  77. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  78. print(
  79. "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
  80. )
  81. return 1
  82. else:
  83. # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
  84. # So, set up it locally
  85. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  86. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  87. ]
  88. with ydb.Driver(
  89. endpoint=DATABASE_ENDPOINT,
  90. database=DATABASE_PATH,
  91. credentials=ydb.credentials_from_env_variables(),
  92. ) as driver:
  93. driver.wait(timeout=10, fail_fast=True)
  94. session = ydb.retry_operation_sync(
  95. lambda: driver.table_client.session().create()
  96. )
  97. # settings, paths, consts
  98. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
  99. table_client = ydb.TableClient(driver, tc_settings)
  100. table_path = f'test_results/analytics/flaky_tests_history_{history_for_n_runs}_runs'
  101. default_start_date = datetime.date(2024, 7, 19)
  102. with ydb.SessionPool(driver) as pool:
  103. create_tables(pool, table_path)
  104. # geting last date from history
  105. last_date_query = f"""select max(date_window) as max_date_window from `{table_path}`
  106. where build_type = '{build_type}' and branch = '{branch}'"""
  107. query = ydb.ScanQuery(last_date_query, {})
  108. it = table_client.scan_query(query)
  109. results = []
  110. while True:
  111. try:
  112. result = next(it)
  113. results = results + result.result_set.rows
  114. except StopIteration:
  115. break
  116. if results[0] and results[0].get( 'max_date_window', default_start_date) is not None and results[0].get( 'max_date_window', default_start_date) > default_start_date:
  117. last_datetime = results[0].get(
  118. 'max_date_window', default_start_date)
  119. else:
  120. last_datetime = default_start_date
  121. last_date = last_datetime.strftime('%Y-%m-%d')
  122. print(f'last hisotry date: {last_date}')
  123. today = datetime.date.today()
  124. date_list = [today - datetime.timedelta(days=x) for x in range((today - last_datetime).days+1)]
  125. for date in sorted(date_list):
  126. query_get_history = f"""
  127. select
  128. full_name,
  129. date_base,
  130. build_type,
  131. branch,
  132. history_list,
  133. if(dist_hist = '','no_runs',dist_hist) as dist_hist,
  134. suite_folder,
  135. test_name,
  136. owners,
  137. first_run,
  138. last_run
  139. from (
  140. select
  141. full_name,
  142. date_base,
  143. build_type,
  144. branch,
  145. AGG_LIST(status) as history_list ,
  146. String::JoinFromList( ListSort(AGG_LIST_DISTINCT(status)) ,',') as dist_hist,
  147. suite_folder,
  148. test_name,
  149. owners,
  150. min(run_timestamp) as first_run,
  151. max(run_timestamp) as last_run
  152. from (
  153. select * from (
  154. select distinct
  155. t1.suite_folder,
  156. t1.test_name,
  157. t1.full_name,
  158. t1.owners,
  159. Date('{date}') as date_base,
  160. '{build_type}' as build_type,
  161. '{branch}' as branch
  162. from `test_results/analytics/testowners` as t1
  163. where run_timestamp_last >= Date('{date}') - 3*Interval("P1D")
  164. and run_timestamp_last <= Date('{date}') + Interval("P1D")
  165. ) as test_and_date
  166. left JOIN (
  167. select * from (
  168. select * from (
  169. select * from (
  170. select
  171. suite_folder || '/' || test_name as full_name,
  172. run_timestamp,
  173. status ,
  174. ROW_NUMBER() OVER (PARTITION BY suite_folder,test_name ORDER BY run_timestamp DESC) AS run_number
  175. from `test_results/test_runs_column`
  176. where
  177. run_timestamp <= Date('{date}') + Interval("P1D")
  178. and run_timestamp >= Date('{date}') -13*Interval("P1D")
  179. and job_name in (
  180. 'Nightly-run',
  181. 'Regression-run',
  182. 'Regression-whitelist-run',
  183. 'Postcommit_relwithdebinfo',
  184. 'Postcommit_asan'
  185. )
  186. and build_type = '{build_type}'
  187. and status != 'skipped'
  188. and branch = '{branch}'
  189. )
  190. where run_number <= {history_for_n_runs}
  191. )
  192. Union all
  193. select * from (
  194. select
  195. suite_folder || '/' || test_name as full_name,
  196. run_timestamp,
  197. status ,
  198. ROW_NUMBER() OVER (PARTITION BY suite_folder,test_name ORDER BY run_timestamp DESC) AS run_number
  199. from `test_results/test_runs_column`
  200. where
  201. run_timestamp <= Date('{date}') + Interval("P1D")
  202. and run_timestamp >= Date('{date}') -13*Interval("P1D")
  203. and job_name in (
  204. 'Nightly-run',
  205. 'Regression-run',
  206. 'Regression-whitelist-run',
  207. 'Postcommit_relwithdebinfo',
  208. 'Postcommit_asan'
  209. )
  210. and build_type = '{build_type}'
  211. and status = 'skipped'
  212. and branch = '{branch}'
  213. )
  214. where run_number <= {history_for_n_runs}
  215. )
  216. order by full_name,run_timestamp
  217. ) as hist
  218. ON test_and_date.full_name=hist.full_name
  219. )
  220. GROUP BY full_name,suite_folder,test_name,date_base,build_type,branch,owners
  221. )
  222. """
  223. query = ydb.ScanQuery(query_get_history, {})
  224. # start transaction time
  225. start_time = time.time()
  226. it = driver.table_client.scan_query(query)
  227. # end transaction time
  228. results = []
  229. prepared_for_update_rows = []
  230. while True:
  231. try:
  232. result = next(it)
  233. results = results + result.result_set.rows
  234. except StopIteration:
  235. break
  236. end_time = time.time()
  237. print(f'transaction duration: {end_time - start_time}')
  238. print(f'history data captured, {len(results)} rows')
  239. for row in results:
  240. row['count'] = dict(zip(list(row['history_list']), [list(
  241. row['history_list']).count(i) for i in list(row['history_list'])]))
  242. prepared_for_update_rows.append({
  243. 'suite_folder': row['suite_folder'],
  244. 'test_name': row['test_name'],
  245. 'first_run': row['first_run'],
  246. 'last_run': row['last_run'],
  247. 'owners': row['owners'],
  248. 'full_name': row['full_name'],
  249. 'date_window': row['date_base'],
  250. 'build_type': row['build_type'],
  251. 'branch': row['branch'],
  252. 'runs_window': history_for_n_runs,
  253. 'history': ','.join(row['history_list']).encode('utf8'),
  254. 'history_class': row['dist_hist'],
  255. 'pass_count': row['count'].get('passed', 0),
  256. 'mute_count': row['count'].get('mute', 0),
  257. 'fail_count': row['count'].get('failure', 0),
  258. 'skip_count': row['count'].get('skipped', 0),
  259. })
  260. print(f'upserting history for date {date}')
  261. with ydb.SessionPool(driver) as pool:
  262. full_path = posixpath.join(DATABASE_PATH, table_path)
  263. bulk_upsert(driver.table_client, full_path,
  264. prepared_for_update_rows)
  265. print('flaky history updated')
  266. print('finished')
  267. if __name__ == "__main__":
  268. main()