flaky_tests_history.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import traceback
  8. import time
  9. import ydb
  10. from collections import Counter
  11. dir = os.path.dirname(__file__)
  12. config = configparser.ConfigParser()
  13. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  14. config.read(config_file_path)
  15. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  16. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  17. def create_tables(pool, table_path):
  18. print(f"> create table if not exists:'{table_path}'")
  19. def callee(session):
  20. session.execute_scheme(f"""
  21. CREATE table IF NOT EXISTS `{table_path}` (
  22. `test_name` Utf8 NOT NULL,
  23. `suite_folder` Utf8 NOT NULL,
  24. `full_name` Utf8 NOT NULL,
  25. `date_window` Date NOT NULL,
  26. `build_type` Utf8 NOT NULL,
  27. `branch` Utf8 NOT NULL,
  28. `first_run` Timestamp,
  29. `last_run` Timestamp ,
  30. `owners` Utf8 ,
  31. `days_ago_window` Uint64 NOT NULL,
  32. `history` String,
  33. `history_class` String,
  34. `pass_count` Uint64,
  35. `mute_count` Uint64,
  36. `fail_count` Uint64,
  37. `skip_count` Uint64,
  38. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window, build_type, branch)
  39. )
  40. PARTITION BY HASH(`full_name`,build_type,branch)
  41. WITH (STORE = COLUMN)
  42. """)
  43. return pool.retry_operation_sync(callee)
  44. def bulk_upsert(table_client, table_path, rows):
  45. print(f"> bulk upsert: {table_path}")
  46. column_types = (
  47. ydb.BulkUpsertColumns()
  48. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  49. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  50. .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  51. .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  52. .add_column("first_run", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
  53. .add_column("last_run", ydb.OptionalType(ydb.PrimitiveType.Timestamp))
  54. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  55. .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
  56. .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  57. .add_column("history", ydb.OptionalType(ydb.PrimitiveType.String))
  58. .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.String))
  59. .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  60. .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  61. .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  62. .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  63. )
  64. table_client.bulk_upsert(table_path, rows, column_types)
  65. def main():
  66. parser = argparse.ArgumentParser()
  67. parser.add_argument('--days-window', default=1, type=int, help='how many days back we collecting history')
  68. parser.add_argument('--build_type',choices=['relwithdebinfo', 'release-asan'], default='relwithdebinfo', type=str, help='build : relwithdebinfo or release-asan')
  69. parser.add_argument('--branch', default='main',choices=['main'], type=str, help='branch')
  70. args, unknown = parser.parse_known_args()
  71. history_for_n_day = args.days_window
  72. build_type = args.build_type
  73. branch = args.branch
  74. print(f'Getting hostory in window {history_for_n_day} days')
  75. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  76. print(
  77. "Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping"
  78. )
  79. return 1
  80. else:
  81. # Do not set up 'real' variable from gh workflows because it interfere with ydb tests
  82. # So, set up it locally
  83. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  84. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  85. ]
  86. with ydb.Driver(
  87. endpoint=DATABASE_ENDPOINT,
  88. database=DATABASE_PATH,
  89. credentials=ydb.credentials_from_env_variables(),
  90. ) as driver:
  91. driver.wait(timeout=10, fail_fast=True)
  92. session = ydb.retry_operation_sync(
  93. lambda: driver.table_client.session().create()
  94. )
  95. # settings, paths, consts
  96. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=True)
  97. table_client = ydb.TableClient(driver, tc_settings)
  98. table_path = f'test_results/analytics/flaky_tests_window_{history_for_n_day}_days'
  99. default_start_date = datetime.date(2024, 9, 1)
  100. with ydb.SessionPool(driver) as pool:
  101. create_tables(pool, table_path)
  102. # geting last date from history
  103. last_date_query = f"""select max(date_window) as max_date_window from `{table_path}`
  104. where build_type = '{build_type}' and branch = '{branch}'"""
  105. query = ydb.ScanQuery(last_date_query, {})
  106. it = table_client.scan_query(query)
  107. results = []
  108. while True:
  109. try:
  110. result = next(it)
  111. results = results + result.result_set.rows
  112. except StopIteration:
  113. break
  114. if results[0] and results[0].get( 'max_date_window', default_start_date) is not None and results[0].get( 'max_date_window', default_start_date) > default_start_date:
  115. last_datetime = results[0].get(
  116. 'max_date_window', default_start_date)
  117. else:
  118. last_datetime = default_start_date
  119. last_date = last_datetime.strftime('%Y-%m-%d')
  120. print(f'last hisotry date: {last_date}')
  121. # getting history for dates >= last_date
  122. today = datetime.date.today()
  123. date_list = [today - datetime.timedelta(days=x) for x in range((today - last_datetime).days+1)]
  124. for date in sorted(date_list):
  125. query_get_history = f"""
  126. select
  127. full_name,
  128. date_base,
  129. history_list,
  130. if(dist_hist = '','no_runs',dist_hist) as dist_hist,
  131. suite_folder,
  132. test_name,
  133. build_type,
  134. branch,
  135. owners,
  136. first_run,
  137. last_run
  138. from (
  139. select
  140. full_name,
  141. date_base,
  142. AGG_LIST(status) as history_list ,
  143. String::JoinFromList( ListSort(AGG_LIST_DISTINCT(status)) ,',') as dist_hist,
  144. suite_folder,
  145. test_name,
  146. owners,
  147. build_type,
  148. branch,
  149. min(run_timestamp) as first_run,
  150. max(run_timestamp) as last_run
  151. from (
  152. select * from (
  153. select distinct
  154. full_name,
  155. suite_folder,
  156. test_name,
  157. owners,
  158. Date('{date}') as date_base,
  159. '{build_type}' as build_type,
  160. '{branch}' as branch
  161. from `test_results/analytics/testowners`
  162. ) as test_and_date
  163. left JOIN (
  164. select
  165. suite_folder || '/' || test_name as full_name,
  166. run_timestamp,
  167. status
  168. from `test_results/test_runs_column`
  169. where
  170. run_timestamp <= Date('{date}') + Interval("P1D")
  171. and run_timestamp >= Date('{date}') - {history_for_n_day}*Interval("P1D")
  172. and (job_name ='Nightly-run' or job_name ='Postcommit_relwithdebinfo' or job_name ='Postcommit_asan')
  173. and build_type = '{build_type}'
  174. and branch = '{branch}'
  175. order by full_name,run_timestamp desc
  176. ) as hist
  177. ON test_and_date.full_name=hist.full_name
  178. )
  179. GROUP BY full_name,suite_folder,test_name,date_base,build_type,branch,owners
  180. )
  181. """
  182. query = ydb.ScanQuery(query_get_history, {})
  183. # start transaction time
  184. start_time = time.time()
  185. it = driver.table_client.scan_query(query)
  186. # end transaction time
  187. results = []
  188. prepared_for_update_rows = []
  189. while True:
  190. try:
  191. result = next(it)
  192. results = results + result.result_set.rows
  193. except StopIteration:
  194. break
  195. end_time = time.time()
  196. print(f'transaction duration: {end_time - start_time}')
  197. print(f'history data captured, {len(results)} rows')
  198. for row in results:
  199. row['count'] = dict(zip(list(row['history_list']), [list(
  200. row['history_list']).count(i) for i in list(row['history_list'])]))
  201. prepared_for_update_rows.append({
  202. 'suite_folder': row['suite_folder'],
  203. 'test_name': row['test_name'],
  204. 'full_name': row['full_name'],
  205. 'date_window': row['date_base'],
  206. 'days_ago_window': history_for_n_day,
  207. 'build_type': row['build_type'],
  208. 'branch': row['branch'],
  209. 'first_run': row['first_run'],
  210. 'last_run': row['last_run'],
  211. 'history': ','.join(row['history_list']).encode('utf8'),
  212. 'history_class': row['dist_hist'],
  213. 'pass_count': row['count'].get('passed', 0),
  214. 'mute_count': row['count'].get('mute', 0),
  215. 'fail_count': row['count'].get('failure', 0),
  216. 'skip_count': row['count'].get('skipped', 0),
  217. })
  218. print(f'upserting history for date {date}')
  219. with ydb.SessionPool(driver) as pool:
  220. create_tables(pool, table_path)
  221. full_path = posixpath.join(DATABASE_PATH, table_path)
  222. bulk_upsert(driver.table_client, full_path,
  223. prepared_for_update_rows)
  224. print('history updated')
  225. if __name__ == "__main__":
  226. main()