tests_monitor.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import sys
  8. import traceback
  9. import time
  10. import ydb
  11. import pandas as pd
  12. from collections import Counter
  13. from multiprocessing import Pool, cpu_count
  14. dir = os.path.dirname(__file__)
  15. config = configparser.ConfigParser()
  16. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  17. config.read(config_file_path)
  18. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  19. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  20. def create_tables(pool, table_path):
  21. print(f"> create table if not exists:'{table_path}'")
  22. def callee(session):
  23. session.execute_scheme(
  24. f"""
  25. CREATE table IF NOT EXISTS `{table_path}` (
  26. `test_name` Utf8 NOT NULL,
  27. `suite_folder` Utf8 NOT NULL,
  28. `full_name` Utf8 NOT NULL,
  29. `date_window` Date NOT NULL,
  30. `build_type` Utf8 NOT NULL,
  31. `branch` Utf8 NOT NULL,
  32. `days_ago_window` Uint64 NOT NULL,
  33. `history` Utf8,
  34. `history_class` Utf8,
  35. `pass_count` Uint64,
  36. `mute_count` Uint64,
  37. `fail_count` Uint64,
  38. `skip_count` Uint64,
  39. `success_rate` Uint64,
  40. `summary` Utf8,
  41. `owner` Utf8,
  42. `is_muted` Uint32,
  43. `is_test_chunk` Uint32,
  44. `state` Utf8,
  45. `previous_state` Utf8,
  46. `state_change_date` Date,
  47. `days_in_state` Uint64,
  48. `previous_mute_state` Uint32,
  49. `mute_state_change_date` Date,
  50. `days_in_mute_state` Uint64,
  51. `previous_state_filtered` Utf8,
  52. `state_change_date_filtered` Date,
  53. `days_in_state_filtered` Uint64,
  54. `state_filtered` Utf8,
  55. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window, build_type, branch)
  56. )
  57. PARTITION BY HASH(build_type,branch)
  58. WITH (STORE = COLUMN)
  59. """
  60. )
  61. return pool.retry_operation_sync(callee)
  62. def bulk_upsert(table_client, table_path, rows):
  63. print(f"> bulk upsert: {table_path}")
  64. column_types = (
  65. ydb.BulkUpsertColumns()
  66. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  67. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  68. .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  69. .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  70. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  71. .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
  72. .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  73. .add_column("history", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  74. .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  75. .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  76. .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  77. .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  78. .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  79. .add_column("success_rate", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  80. .add_column("summary", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  81. .add_column("owner", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  82. .add_column("is_muted", ydb.OptionalType(ydb.PrimitiveType.Uint32))
  83. .add_column("is_test_chunk", ydb.OptionalType(ydb.PrimitiveType.Uint32))
  84. .add_column("state", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  85. .add_column("previous_state", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  86. .add_column("state_change_date", ydb.OptionalType(ydb.PrimitiveType.Date))
  87. .add_column("days_in_state", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  88. .add_column("previous_mute_state", ydb.OptionalType(ydb.PrimitiveType.Uint32))
  89. .add_column("days_in_mute_state", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  90. .add_column("mute_state_change_date", ydb.OptionalType(ydb.PrimitiveType.Date))
  91. .add_column("previous_state_filtered", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  92. .add_column("state_change_date_filtered", ydb.OptionalType(ydb.PrimitiveType.Date))
  93. .add_column("days_in_state_filtered", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  94. .add_column("state_filtered", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  95. )
  96. table_client.bulk_upsert(table_path, rows, column_types)
  97. def process_test_group(name, group, last_day_data, default_start_date):
  98. state_list_for_filter = ['Muted', 'Muted Flaky', 'Muted Stable', 'Flaky', 'Passed']
  99. """Processes data for a single test group (by full_name)."""
  100. previous_state_list = []
  101. state_change_date_list = []
  102. days_in_state_list = []
  103. previous_mute_state_list = []
  104. mute_state_change_date_list = []
  105. days_in_mute_state_list = []
  106. previous_state_filtered_list = []
  107. state_change_date_filtered_list = []
  108. days_in_state_filtered_list = []
  109. state_filtered_list = []
  110. # Get 'days_in_state' for the last existing day for the current test
  111. if last_day_data is not None and last_day_data[last_day_data['full_name'] == name].shape[0] > 0:
  112. prev_state = last_day_data[last_day_data['full_name'] == name]['state'].iloc[0]
  113. prev_date = last_day_data[last_day_data['full_name'] == name]['state_change_date'].iloc[0]
  114. current_days_in_state = last_day_data[last_day_data['full_name'] == name]['days_in_state'].iloc[0]
  115. prev_mute_state = last_day_data[last_day_data['full_name'] == name]['is_muted'].iloc[0]
  116. prev_mute_date = last_day_data[last_day_data['full_name'] == name]['mute_state_change_date'].iloc[0]
  117. current_days_in_mute_state = last_day_data[last_day_data['full_name'] == name]['days_in_mute_state'].iloc[0]
  118. prev_state_filtered = last_day_data[last_day_data['full_name'] == name]['state_filtered'].iloc[0]
  119. prev_date_filtered = last_day_data[last_day_data['full_name'] == name]['state_change_date_filtered'].iloc[0]
  120. current_days_in_state_filtered = last_day_data[last_day_data['full_name'] == name][
  121. 'days_in_state_filtered'
  122. ].iloc[0]
  123. saved_prev_state = last_day_data[last_day_data['full_name'] == name]['previous_state'].iloc[0]
  124. saved_prev_mute_state= last_day_data[last_day_data['full_name'] == name]['previous_mute_state'].iloc[0]
  125. saved_prev_state_filtered = last_day_data[last_day_data['full_name'] == name]['previous_state_filtered'].iloc[0]
  126. else:
  127. prev_state = 'no_runs'
  128. prev_date = datetime.datetime(default_start_date.year, default_start_date.month, default_start_date.day)
  129. current_days_in_state = 0
  130. prev_mute_state = 0
  131. prev_mute_date = datetime.datetime(default_start_date.year, default_start_date.month, default_start_date.day)
  132. current_days_in_mute_state = 0
  133. state_filtered = ''
  134. prev_state_filtered = 'no_runs'
  135. prev_date_filtered = datetime.datetime(
  136. default_start_date.year, default_start_date.month, default_start_date.day
  137. )
  138. current_days_in_state_filtered = 0
  139. saved_prev_state = prev_state
  140. saved_prev_mute_state = prev_mute_state
  141. saved_prev_state_filtered = prev_state_filtered
  142. for index, row in group.iterrows():
  143. # Process prev state
  144. current_days_in_state += 1
  145. if row['state'] != prev_state:
  146. saved_prev_state = prev_state
  147. prev_state = row['state']
  148. prev_date = row['date_window']
  149. current_days_in_state = 1
  150. previous_state_list.append(saved_prev_state)
  151. state_change_date_list.append(prev_date)
  152. days_in_state_list.append(current_days_in_state)
  153. # Process prev mute state
  154. current_days_in_mute_state += 1
  155. if row['is_muted'] != prev_mute_state:
  156. saved_prev_mute_state = prev_mute_state
  157. prev_mute_state = row['is_muted']
  158. prev_mute_date = row['date_window']
  159. current_days_in_mute_state = 1
  160. previous_mute_state_list.append(saved_prev_mute_state)
  161. mute_state_change_date_list.append(prev_mute_date)
  162. days_in_mute_state_list.append(current_days_in_mute_state)
  163. # Process filtered states
  164. if row['state'] not in state_list_for_filter:
  165. state_filtered = prev_state_filtered
  166. else:
  167. state_filtered = row['state']
  168. current_days_in_state_filtered += 1
  169. if state_filtered != prev_state_filtered:
  170. saved_prev_state_filtered = prev_state_filtered
  171. prev_state_filtered = state_filtered
  172. prev_date_filtered = row['date_window']
  173. current_days_in_state_filtered = 1
  174. state_filtered_list.append(state_filtered)
  175. previous_state_filtered_list.append(saved_prev_state_filtered)
  176. state_change_date_filtered_list.append(prev_date_filtered)
  177. days_in_state_filtered_list.append(current_days_in_state_filtered)
  178. return {
  179. 'previous_state': previous_state_list,
  180. 'state_change_date': state_change_date_list,
  181. 'days_in_state': days_in_state_list,
  182. 'previous_mute_state': previous_mute_state_list,
  183. 'mute_state_change_date': mute_state_change_date_list,
  184. 'days_in_mute_state': days_in_mute_state_list,
  185. 'previous_state_filtered': previous_state_filtered_list,
  186. 'state_change_date_filtered': state_change_date_filtered_list,
  187. 'days_in_state_filtered': days_in_state_filtered_list,
  188. 'state_filtered': state_filtered_list,
  189. }
  190. def determine_state(row):
  191. history_class = row['history_class']
  192. is_muted = row['is_muted']
  193. if is_muted == 1:
  194. if 'mute' in history_class or 'failure' in history_class:
  195. return 'Muted Flaky'
  196. elif 'pass' in history_class and not 'failure' in history_class and not 'mute' in history_class :
  197. return 'Muted Stable'
  198. elif 'skipped' in history_class or not history_class:
  199. return 'Skipped'
  200. else:
  201. return history_class
  202. else:
  203. if 'failure' in history_class and 'mute' not in history_class:
  204. return 'Flaky'
  205. elif 'mute' in history_class:
  206. return 'Muted'
  207. elif 'skipped' in history_class or not history_class:
  208. return 'Skipped'
  209. elif 'pass' in history_class:
  210. return 'Passed'
  211. else:
  212. return history_class
  213. def calculate_success_rate(row):
  214. total_count = row['pass_count'] + row['mute_count'] + row['fail_count']
  215. if total_count == 0:
  216. return 0.0
  217. else:
  218. return (row['pass_count'] / total_count) * 100
  219. def calculate_summary(row):
  220. return (
  221. 'Pass:'
  222. + str(row['pass_count'])
  223. + ' Fail:'
  224. + str(row['fail_count'])
  225. + ' Mute:'
  226. + str(row['mute_count'])
  227. + ' Skip:'
  228. + str(row['skip_count'])
  229. )
  230. def compute_owner(owner):
  231. if not owner or owner == '':
  232. return 'Unknown'
  233. elif ';;' in owner:
  234. parts = owner.split(';;', 1)
  235. if 'TEAM' in parts[0]:
  236. return parts[0]
  237. else:
  238. return parts[1]
  239. else:
  240. return owner
  241. def main():
  242. parser = argparse.ArgumentParser()
  243. parser.add_argument('--days-window', default=1, type=int, help='how many days back we collecting history')
  244. parser.add_argument(
  245. '--build_type',
  246. choices=['relwithdebinfo', 'release-asan'],
  247. default='relwithdebinfo',
  248. type=str,
  249. help='build : relwithdebinfo or release-asan',
  250. )
  251. parser.add_argument('--branch', default='main', choices=['main'], type=str, help='branch')
  252. parser.add_argument(
  253. '--concurent',
  254. dest='concurrent_mode',
  255. action='store_true',
  256. default=True,
  257. help='Set concurrent mode to true (default).',
  258. )
  259. parser.add_argument(
  260. '--no-concurrent', dest='concurrent_mode', action='store_false', help='Set concurrent mode to false.'
  261. )
  262. args, unknown = parser.parse_known_args()
  263. history_for_n_day = args.days_window
  264. build_type = args.build_type
  265. branch = args.branch
  266. concurrent_mode = args.concurrent_mode
  267. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  268. print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
  269. return 1
  270. else:
  271. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  272. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  273. ]
  274. with ydb.Driver(
  275. endpoint=DATABASE_ENDPOINT,
  276. database=DATABASE_PATH,
  277. credentials=ydb.credentials_from_env_variables(),
  278. ) as driver:
  279. driver.wait(timeout=10, fail_fast=True)
  280. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=False)
  281. table_client = ydb.TableClient(driver, tc_settings)
  282. base_date = datetime.datetime(1970, 1, 1)
  283. default_start_date = datetime.date(2024, 11, 1)
  284. today = datetime.date.today()
  285. table_path = f'test_results/analytics/tests_monitor'
  286. # Get last existing day
  287. print("Geting date of last collected monitor data")
  288. query_last_exist_day = f"""
  289. SELECT MAX(date_window) AS last_exist_day
  290. FROM `{table_path}`
  291. WHERE build_type = '{build_type}'
  292. AND branch = '{branch}'
  293. """
  294. query = ydb.ScanQuery(query_last_exist_day, {})
  295. it = driver.table_client.scan_query(query)
  296. last_exist_day = None
  297. while True:
  298. try:
  299. result = next(it)
  300. last_exist_day = result.result_set.rows[0][
  301. 'last_exist_day'
  302. ] # exclude last day, we want recalculate last day
  303. break
  304. except StopIteration:
  305. break
  306. except Exception as e:
  307. print(f"Error during fetching last existing day: {e}")
  308. last_exist_df = None
  309. last_day_data = None
  310. # If no data exists, set last_exist_day to a default start date
  311. if last_exist_day is None:
  312. last_exist_day = default_start_date
  313. last_exist_day_str = last_exist_day.strftime('%Y-%m-%d')
  314. date_list = [today - datetime.timedelta(days=x) for x in range((today - last_exist_day).days + 1)]
  315. print(f"Monitor data do not exist - init new monitor collecting from default date {last_exist_day_str}")
  316. else:
  317. # Get data from tests_monitor for last existing day
  318. last_exist_day = (base_date + datetime.timedelta(days=last_exist_day)).date()
  319. if last_exist_day == today: # to recalculate data for today
  320. last_exist_day = last_exist_day - datetime.timedelta(days=1)
  321. last_exist_day_str = last_exist_day.strftime('%Y-%m-%d')
  322. print(f"Monitor data exist - geting data for date {last_exist_day_str}")
  323. date_list = [today - datetime.timedelta(days=x) for x in range((today - last_exist_day).days)]
  324. query_last_exist_data = f"""
  325. SELECT *
  326. FROM `{table_path}`
  327. WHERE build_type = '{build_type}'
  328. AND branch = '{branch}'
  329. AND date_window = Date('{last_exist_day_str}')
  330. """
  331. query = ydb.ScanQuery(query_last_exist_data, {})
  332. it = driver.table_client.scan_query(query)
  333. last_exist_data = []
  334. while True:
  335. try:
  336. result = next(it)
  337. # Convert each row to a dictionary with consistent keys
  338. for row in result.result_set.rows:
  339. row_dict = {
  340. 'test_name': row['test_name'],
  341. 'suite_folder': row['suite_folder'],
  342. 'full_name': row['full_name'],
  343. 'date_window': base_date + datetime.timedelta(days=row['date_window']),
  344. 'build_type': row['build_type'],
  345. 'branch': row['branch'],
  346. 'days_ago_window': row['days_ago_window'],
  347. 'history': row['history'],
  348. 'history_class': row['history_class'],
  349. 'pass_count': row['pass_count'],
  350. 'mute_count': row['mute_count'],
  351. 'fail_count': row['fail_count'],
  352. 'skip_count': row['skip_count'],
  353. 'success_rate': row['success_rate'],
  354. 'summary': row['summary'],
  355. 'owners': row['owner'],
  356. 'is_muted': row['is_muted'],
  357. 'is_test_chunk': row['is_test_chunk'],
  358. 'state': row['state'],
  359. 'previous_state': row['previous_state'],
  360. 'state_change_date': base_date + datetime.timedelta(days=row['state_change_date']),
  361. 'days_in_state': row['days_in_state'],
  362. 'previous_mute_state': row['previous_mute_state'],
  363. 'mute_state_change_date': base_date + datetime.timedelta(days=row['mute_state_change_date']),
  364. 'days_in_mute_state': row['days_in_mute_state'],
  365. 'previous_state_filtered': row['previous_state_filtered'],
  366. 'state_change_date_filtered': base_date
  367. + datetime.timedelta(days=row['state_change_date_filtered']),
  368. 'days_in_state_filtered': row['days_in_state_filtered'],
  369. 'state_filtered': row['state_filtered'],
  370. }
  371. last_exist_data.append(row_dict)
  372. except StopIteration:
  373. break
  374. last_exist_df = pd.DataFrame(last_exist_data)
  375. # Get data from flaky_tests_window table for dates after last existing day
  376. data = {
  377. 'test_name': [],
  378. 'suite_folder': [],
  379. 'full_name': [],
  380. 'date_window': [],
  381. 'build_type': [],
  382. 'branch': [],
  383. 'owners': [],
  384. 'days_ago_window': [],
  385. 'history': [],
  386. 'history_class': [],
  387. 'pass_count': [],
  388. 'mute_count': [],
  389. 'fail_count': [],
  390. 'skip_count': [],
  391. 'is_muted': [],
  392. }
  393. print(f'Getting aggregated history in window {history_for_n_day} days')
  394. for date in sorted(date_list):
  395. # Query for data from flaky_tests_window with date_window >= last_existing_day
  396. query_get_history = f"""
  397. SELECT
  398. hist.branch AS branch,
  399. hist.build_type AS build_type,
  400. hist.date_window AS date_window,
  401. hist.days_ago_window AS days_ago_window,
  402. hist.fail_count AS fail_count,
  403. hist.full_name AS full_name,
  404. hist.history AS history,
  405. hist.history_class AS history_class,
  406. hist.mute_count AS mute_count,
  407. COALESCE(owners_t.owners, fallback_t.owners) AS owners,
  408. hist.pass_count AS pass_count,
  409. COALESCE(owners_t.run_timestamp_last, NULL) AS run_timestamp_last,
  410. COALESCE(owners_t.is_muted, NULL) AS is_muted,
  411. hist.skip_count AS skip_count,
  412. hist.suite_folder AS suite_folder,
  413. hist.test_name AS test_name
  414. FROM (
  415. SELECT * FROM
  416. `test_results/analytics/flaky_tests_window_{history_for_n_day}_days`
  417. WHERE
  418. date_window = Date('{date}')
  419. AND build_type = '{build_type}'
  420. AND branch = '{branch}'
  421. ) AS hist
  422. LEFT JOIN (
  423. SELECT
  424. test_name,
  425. suite_folder,
  426. owners,
  427. run_timestamp_last,
  428. is_muted,
  429. date
  430. FROM
  431. `test_results/all_tests_with_owner_and_mute`
  432. WHERE
  433. branch = '{branch}'
  434. AND date = Date('{date}')
  435. ) AS owners_t
  436. ON
  437. hist.test_name = owners_t.test_name
  438. AND hist.suite_folder = owners_t.suite_folder
  439. AND hist.date_window = owners_t.date
  440. LEFT JOIN (
  441. SELECT
  442. test_name,
  443. suite_folder,
  444. owners
  445. FROM
  446. `test_results/analytics/testowners`
  447. ) AS fallback_t
  448. ON
  449. hist.test_name = fallback_t.test_name
  450. AND hist.suite_folder = fallback_t.suite_folder
  451. WHERE
  452. owners_t.test_name IS NOT NULL OR fallback_t.test_name IS NOT NULL;
  453. """
  454. query = ydb.ScanQuery(query_get_history, {})
  455. # start transaction time
  456. start_time = time.time()
  457. it = driver.table_client.scan_query(query)
  458. # end transaction time
  459. results = []
  460. prepared_for_update_rows = []
  461. while True:
  462. try:
  463. result = next(it)
  464. results = results + result.result_set.rows
  465. except StopIteration:
  466. break
  467. end_time = time.time()
  468. print(f'Captured raw data for {date} duration: {end_time - start_time}')
  469. start_time = time.time()
  470. # Check if new data was found
  471. if results:
  472. for row in results:
  473. data['test_name'].append(row['test_name'])
  474. data['suite_folder'].append(row['suite_folder'])
  475. data['full_name'].append(row['full_name'])
  476. data['date_window'].append(base_date + datetime.timedelta(days=row['date_window']))
  477. data['build_type'].append(row['build_type'])
  478. data['branch'].append(row['branch'])
  479. data['owners'].append(row['owners'])
  480. data['days_ago_window'].append(row['days_ago_window'])
  481. data['history'].append(
  482. row['history'].decode('utf-8') if isinstance(row['history'], bytes) else row['history']
  483. )
  484. data['history_class'].append(
  485. row['history_class'].decode('utf-8')
  486. if isinstance(row['history_class'], bytes)
  487. else row['history_class']
  488. )
  489. data['pass_count'].append(row['pass_count'])
  490. data['mute_count'].append(row['mute_count'])
  491. data['fail_count'].append(row['fail_count'])
  492. data['skip_count'].append(row['skip_count'])
  493. data['is_muted'].append(row['is_muted'])
  494. else:
  495. print(
  496. f"Warning: No data found in flaky_tests_window for date {date} build_type='{build_type}', branch='{branch}'"
  497. )
  498. start_time = time.time()
  499. df = pd.DataFrame(data)
  500. # **Concatenate DataFrames**
  501. if last_exist_df is not None and last_exist_df.shape[0] > 0:
  502. last_day_data = last_exist_df[
  503. [
  504. 'full_name',
  505. 'days_in_state',
  506. 'state',
  507. 'previous_state',
  508. 'state_change_date',
  509. 'is_muted',
  510. 'days_in_mute_state',
  511. 'previous_mute_state',
  512. 'mute_state_change_date',
  513. 'days_in_state_filtered',
  514. 'state_change_date_filtered',
  515. 'previous_state_filtered',
  516. 'state_filtered',
  517. ]
  518. ]
  519. end_time = time.time()
  520. print(f'Dataframe inited: {end_time - start_time}')
  521. tart_time = time.time()
  522. df = df.sort_values(by=['full_name', 'date_window'])
  523. end_time = time.time()
  524. print(f'Dataframe sorted: {end_time - start_time}')
  525. start_time = time.time()
  526. df['success_rate'] = df.apply(calculate_success_rate, axis=1).astype(int)
  527. df['summary'] = df.apply(calculate_summary, axis=1)
  528. df['owner'] = df['owners'].apply(compute_owner)
  529. df['is_test_chunk'] = df['full_name'].str.contains('chunk chunk|chunk\+chunk', regex=True).astype(int)
  530. df['is_muted'] = df['is_muted'].fillna(0).astype(int)
  531. df['success_rate'].astype(int)
  532. df['state'] = df.apply(determine_state, axis=1)
  533. end_time = time.time()
  534. print(f'Computed base params: {end_time - start_time}')
  535. start_time = time.time()
  536. if concurrent_mode:
  537. with Pool(processes=cpu_count()) as pool:
  538. results = pool.starmap(
  539. process_test_group,
  540. [(name, group, last_day_data, default_start_date) for name, group in df.groupby('full_name')],
  541. )
  542. end_time = time.time()
  543. print(
  544. f'Computed days_in_state, state_change_date, previous_state and other params: {end_time - start_time}'
  545. )
  546. start_time = time.time()
  547. # Apply results to the DataFrame
  548. for i, (name, group) in enumerate(df.groupby('full_name')):
  549. df.loc[group.index, 'previous_state'] = results[i]['previous_state']
  550. df.loc[group.index, 'state_change_date'] = results[i]['state_change_date']
  551. df.loc[group.index, 'days_in_state'] = results[i]['days_in_state']
  552. df.loc[group.index, 'previous_mute_state'] = results[i]['previous_mute_state']
  553. df.loc[group.index, 'mute_state_change_date'] = results[i]['mute_state_change_date']
  554. df.loc[group.index, 'days_in_mute_state'] = results[i]['days_in_mute_state']
  555. df.loc[group.index, 'previous_state_filtered'] = results[i]['previous_state_filtered']
  556. df.loc[group.index, 'state_change_date_filtered'] = results[i]['state_change_date_filtered']
  557. df.loc[group.index, 'days_in_state_filtered'] = results[i]['days_in_state_filtered']
  558. df.loc[group.index, 'state_filtered'] = results[i]['state_filtered']
  559. else:
  560. previous_state_list = []
  561. state_change_date_list = []
  562. days_in_state_list = []
  563. previous_mute_state_list = []
  564. mute_state_change_date_list = []
  565. days_in_mute_state_list = []
  566. previous_state_filtered_list = []
  567. state_change_date_filtered_list = []
  568. days_in_state_filtered_list = []
  569. state_filtered_list = []
  570. for name, group in df.groupby('full_name'):
  571. result = process_test_group(name, group, last_day_data, default_start_date)
  572. previous_state_list = previous_state_list + result['previous_state']
  573. state_change_date_list = state_change_date_list + result['state_change_date']
  574. days_in_state_list = days_in_state_list + result['days_in_state']
  575. previous_mute_state_list = previous_mute_state_list + result['previous_mute_state']
  576. mute_state_change_date_list = mute_state_change_date_list + result['mute_state_change_date']
  577. days_in_mute_state_list = days_in_mute_state_list + result['days_in_mute_state']
  578. previous_state_filtered_list = previous_state_filtered_list + result['previous_state_filtered']
  579. state_change_date_filtered_list = state_change_date_filtered_list + result['state_change_date_filtered']
  580. days_in_state_filtered_list = days_in_state_filtered_list + result['days_in_state_filtered']
  581. state_filtered_list = state_filtered_list + result['state_filtered']
  582. end_time = time.time()
  583. print(
  584. f'Computed days_in_state, state_change_date, previous_state and other params: {end_time - start_time}'
  585. )
  586. start_time = time.time()
  587. # Apply results to the DataFrame
  588. df['previous_state'] = previous_state_list
  589. df['state_change_date'] = state_change_date_list
  590. df['days_in_state'] = days_in_state_list
  591. df['previous_mute_state'] = previous_mute_state_list
  592. df['mute_state_change_date'] = mute_state_change_date_list
  593. df['days_in_mute_state'] = days_in_mute_state_list
  594. df['previous_state_filtered'] = previous_state_filtered_list
  595. df['state_change_date_filtered'] = state_change_date_filtered_list
  596. df['days_in_state_filtered'] = days_in_state_filtered_list
  597. df['state_filtered'] = state_filtered_list
  598. end_time = time.time()
  599. print(f'Saving computed result in dataframe: {end_time - start_time}')
  600. start_time = time.time()
  601. df['date_window'] = df['date_window'].dt.date
  602. df['state_change_date'] = df['state_change_date'].dt.date
  603. df['days_in_state'] = df['days_in_state'].astype(int)
  604. df['previous_mute_state'] = df['previous_mute_state'].astype(int)
  605. df['mute_state_change_date'] = df['mute_state_change_date'].dt.date
  606. df['days_in_mute_state'] = df['days_in_mute_state'].astype(int)
  607. df['state_change_date_filtered'] = df['state_change_date_filtered'].dt.date
  608. df['days_in_state_filtered'] = df['days_in_state_filtered'].astype(int)
  609. end_time = time.time()
  610. print(f'Converting types of columns: {end_time - start_time}')
  611. start_time = time.time()
  612. result = df[
  613. [
  614. 'full_name',
  615. 'date_window',
  616. 'suite_folder',
  617. 'test_name',
  618. 'days_ago_window',
  619. 'build_type',
  620. 'branch',
  621. 'history',
  622. 'history_class',
  623. 'pass_count',
  624. 'mute_count',
  625. 'fail_count',
  626. 'skip_count',
  627. 'summary',
  628. 'owner',
  629. 'is_test_chunk',
  630. 'is_muted',
  631. 'state',
  632. 'previous_state',
  633. 'state_change_date',
  634. 'days_in_state',
  635. 'previous_mute_state',
  636. 'mute_state_change_date',
  637. 'days_in_mute_state',
  638. 'previous_state_filtered',
  639. 'state_change_date_filtered',
  640. 'days_in_state_filtered',
  641. 'state_filtered',
  642. 'success_rate',
  643. ]
  644. ]
  645. end_time = time.time()
  646. print(f'Dataframe prepared {end_time - start_time}')
  647. print(f'Data collected, {len(result)} rows')
  648. start_time = time.time()
  649. prepared_for_update_rows = result.to_dict('records')
  650. end_time = time.time()
  651. print(f'Data converted to dict for upsert: {end_time - start_time}')
  652. start_upsert_time = time.time()
  653. with ydb.SessionPool(driver) as pool:
  654. create_tables(pool, table_path)
  655. full_path = posixpath.join(DATABASE_PATH, table_path)
  656. def chunk_data(data, chunk_size):
  657. for i in range(0, len(data), chunk_size):
  658. yield data[i : i + chunk_size]
  659. chunk_size = 40000
  660. total_chunks = len(prepared_for_update_rows) // chunk_size + (
  661. 1 if len(prepared_for_update_rows) % chunk_size != 0 else 0
  662. )
  663. for i, chunk in enumerate(chunk_data(prepared_for_update_rows, chunk_size), start=1):
  664. start_time = time.time()
  665. print(f"Uploading chunk {i}/{total_chunks}")
  666. bulk_upsert(driver.table_client, full_path, chunk)
  667. end_time = time.time()
  668. print(f'upsert for: {end_time - start_time} ')
  669. end_time = time.time()
  670. print(f'monitor data upserted: {end_time - start_upsert_time}')
  671. if __name__ == "__main__":
  672. main()