tests_monitor.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. #!/usr/bin/env python3
  2. import argparse
  3. import configparser
  4. import datetime
  5. import os
  6. import posixpath
  7. import sys
  8. import traceback
  9. import time
  10. import ydb
  11. import pandas as pd
  12. from collections import Counter
  13. from multiprocessing import Pool, cpu_count
  14. dir = os.path.dirname(__file__)
  15. config = configparser.ConfigParser()
  16. config_file_path = f"{dir}/../../config/ydb_qa_db.ini"
  17. config.read(config_file_path)
  18. DATABASE_ENDPOINT = config["QA_DB"]["DATABASE_ENDPOINT"]
  19. DATABASE_PATH = config["QA_DB"]["DATABASE_PATH"]
  20. def create_tables(pool, table_path):
  21. print(f"> create table if not exists:'{table_path}'")
  22. def callee(session):
  23. session.execute_scheme(
  24. f"""
  25. CREATE table IF NOT EXISTS `{table_path}` (
  26. `test_name` Utf8 NOT NULL,
  27. `suite_folder` Utf8 NOT NULL,
  28. `full_name` Utf8 NOT NULL,
  29. `date_window` Date NOT NULL,
  30. `build_type` Utf8 NOT NULL,
  31. `branch` Utf8 NOT NULL,
  32. `days_ago_window` Uint64 NOT NULL,
  33. `history` Utf8,
  34. `history_class` Utf8,
  35. `pass_count` Uint64,
  36. `mute_count` Uint64,
  37. `fail_count` Uint64,
  38. `skip_count` Uint64,
  39. `success_rate` Uint64,
  40. `summary` Utf8,
  41. `owner` Utf8,
  42. `is_muted` Uint32,
  43. `is_test_chunk` Uint32,
  44. `state` Utf8,
  45. `previous_state` Utf8,
  46. `state_change_date` Date,
  47. `days_in_state` Uint64,
  48. `previous_state_filtered` Utf8,
  49. `state_change_date_filtered` Date,
  50. `days_in_state_filtered` Uint64,
  51. `state_filtered` Utf8,
  52. PRIMARY KEY (`test_name`, `suite_folder`, `full_name`,date_window, build_type, branch)
  53. )
  54. PARTITION BY HASH(build_type,branch)
  55. WITH (STORE = COLUMN)
  56. """
  57. )
  58. return pool.retry_operation_sync(callee)
  59. def bulk_upsert(table_client, table_path, rows):
  60. print(f"> bulk upsert: {table_path}")
  61. column_types = (
  62. ydb.BulkUpsertColumns()
  63. .add_column("test_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  64. .add_column("suite_folder", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  65. .add_column("build_type", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  66. .add_column("branch", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  67. .add_column("full_name", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  68. .add_column("date_window", ydb.OptionalType(ydb.PrimitiveType.Date))
  69. .add_column("days_ago_window", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  70. .add_column("history", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  71. .add_column("history_class", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  72. .add_column("pass_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  73. .add_column("mute_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  74. .add_column("fail_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  75. .add_column("skip_count", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  76. .add_column("success_rate", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  77. .add_column("summary", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  78. .add_column("owner", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  79. .add_column("is_muted", ydb.OptionalType(ydb.PrimitiveType.Uint32))
  80. .add_column("is_test_chunk", ydb.OptionalType(ydb.PrimitiveType.Uint32))
  81. .add_column("state", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  82. .add_column("previous_state", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  83. .add_column("state_change_date", ydb.OptionalType(ydb.PrimitiveType.Date))
  84. .add_column("days_in_state", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  85. .add_column("previous_state_filtered", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  86. .add_column("state_change_date_filtered", ydb.OptionalType(ydb.PrimitiveType.Date))
  87. .add_column("days_in_state_filtered", ydb.OptionalType(ydb.PrimitiveType.Uint64))
  88. .add_column("state_filtered", ydb.OptionalType(ydb.PrimitiveType.Utf8))
  89. )
  90. table_client.bulk_upsert(table_path, rows, column_types)
  91. def process_test_group(name, group, last_day_data, default_start_date):
  92. state_list_for_filter = ['Muted', 'Muted Flaky', 'Muted Stable', 'Flaky', 'Passed']
  93. """Processes data for a single test group (by full_name)."""
  94. previous_state_list = []
  95. state_change_date_list = []
  96. days_in_state_list = []
  97. previous_state_filtered_list = []
  98. state_change_date_filtered_list = []
  99. days_in_state_filtered_list = []
  100. state_filtered_list = []
  101. # Get 'days_in_state' for the last existing day for the current test
  102. if last_day_data is not None and last_day_data[last_day_data['full_name'] == name].shape[0] > 0:
  103. prev_state = last_day_data[last_day_data['full_name'] == name]['state'].iloc[0]
  104. prev_date = last_day_data[last_day_data['full_name'] == name]['state_change_date'].iloc[0]
  105. current_days_in_state = last_day_data[last_day_data['full_name'] == name]['days_in_state'].iloc[0]
  106. prev_state_filtered = last_day_data[last_day_data['full_name'] == name]['state_filtered'].iloc[0]
  107. prev_date_filtered = last_day_data[last_day_data['full_name'] == name]['state_change_date_filtered'].iloc[0]
  108. current_days_in_state_filtered = last_day_data[last_day_data['full_name'] == name][
  109. 'days_in_state_filtered'
  110. ].iloc[0]
  111. saved_prev_state = last_day_data[last_day_data['full_name'] == name]['previous_state'].iloc[0]
  112. saved_prev_state_filtered = last_day_data[last_day_data['full_name'] == name]['previous_state_filtered'].iloc[0]
  113. else:
  114. prev_state = 'no_runs'
  115. prev_date = datetime.datetime(default_start_date.year, default_start_date.month, default_start_date.day)
  116. current_days_in_state = 0
  117. state_filtered = ''
  118. prev_state_filtered = 'no_runs'
  119. prev_date_filtered = datetime.datetime(
  120. default_start_date.year, default_start_date.month, default_start_date.day
  121. )
  122. current_days_in_state_filtered = 0
  123. saved_prev_state = prev_state
  124. saved_prev_state_filtered = prev_state_filtered
  125. for index, row in group.iterrows():
  126. current_days_in_state += 1
  127. if row['state'] != prev_state:
  128. saved_prev_state = prev_state
  129. prev_state = row['state']
  130. prev_date = row['date_window']
  131. current_days_in_state = 1
  132. previous_state_list.append(saved_prev_state)
  133. state_change_date_list.append(prev_date)
  134. days_in_state_list.append(current_days_in_state)
  135. # Process filtered states
  136. if row['state'] not in state_list_for_filter:
  137. state_filtered = prev_state_filtered
  138. else:
  139. state_filtered = row['state']
  140. current_days_in_state_filtered += 1
  141. if state_filtered != prev_state_filtered:
  142. saved_prev_state_filtered = prev_state_filtered
  143. prev_state_filtered = state_filtered
  144. prev_date_filtered = row['date_window']
  145. current_days_in_state_filtered = 1
  146. state_filtered_list.append(state_filtered)
  147. previous_state_filtered_list.append(saved_prev_state_filtered)
  148. state_change_date_filtered_list.append(prev_date_filtered)
  149. days_in_state_filtered_list.append(current_days_in_state_filtered)
  150. return {
  151. 'previous_state': previous_state_list,
  152. 'state_change_date': state_change_date_list,
  153. 'days_in_state': days_in_state_list,
  154. 'previous_state_filtered': previous_state_filtered_list,
  155. 'state_change_date_filtered': state_change_date_filtered_list,
  156. 'days_in_state_filtered': days_in_state_filtered_list,
  157. 'state_filtered': state_filtered_list,
  158. }
  159. def determine_state(row):
  160. history_class = row['history_class']
  161. is_muted = row['is_muted']
  162. if is_muted == 1:
  163. if 'mute' in history_class:
  164. return 'Muted Flaky'
  165. elif 'pass' in history_class:
  166. return 'Muted Stable'
  167. elif 'skipped' in history_class or not history_class:
  168. return 'Skipped'
  169. else:
  170. return history_class
  171. else:
  172. if 'failure' in history_class and 'mute' not in history_class:
  173. return 'Flaky'
  174. elif 'mute' in history_class:
  175. return 'Muted'
  176. elif 'skipped' in history_class or not history_class:
  177. return 'Skipped'
  178. elif 'pass' in history_class:
  179. return 'Passed'
  180. else:
  181. return history_class
  182. def calculate_success_rate(row):
  183. total_count = row['pass_count'] + row['mute_count'] + row['fail_count']
  184. if total_count == 0:
  185. return 0.0
  186. else:
  187. return (row['pass_count'] / total_count) * 100
  188. def calculate_summary(row):
  189. return (
  190. 'Pass:'
  191. + str(row['pass_count'])
  192. + ' Fail:'
  193. + str(row['fail_count'])
  194. + ' Mute:'
  195. + str(row['mute_count'])
  196. + ' Skip:'
  197. + str(row['skip_count'])
  198. )
  199. def compute_owner(owner):
  200. if not owner or owner == '':
  201. return 'Unknown'
  202. elif ';;' in owner:
  203. parts = owner.split(';;', 1)
  204. if 'TEAM' in parts[0]:
  205. return parts[0]
  206. else:
  207. return parts[1]
  208. else:
  209. return owner
  210. def main():
  211. parser = argparse.ArgumentParser()
  212. parser.add_argument('--days-window', default=1, type=int, help='how many days back we collecting history')
  213. parser.add_argument(
  214. '--build_type',
  215. choices=['relwithdebinfo', 'release-asan'],
  216. default='relwithdebinfo',
  217. type=str,
  218. help='build : relwithdebinfo or release-asan',
  219. )
  220. parser.add_argument('--branch', default='main', choices=['main'], type=str, help='branch')
  221. parser.add_argument(
  222. '--concurent',
  223. dest='concurrent_mode',
  224. action='store_true',
  225. default=True,
  226. help='Set concurrent mode to true (default).',
  227. )
  228. parser.add_argument(
  229. '--no-concurrent', dest='concurrent_mode', action='store_false', help='Set concurrent mode to false.'
  230. )
  231. args, unknown = parser.parse_known_args()
  232. history_for_n_day = args.days_window
  233. build_type = args.build_type
  234. branch = args.branch
  235. concurrent_mode = args.concurrent_mode
  236. if "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS" not in os.environ:
  237. print("Error: Env variable CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS is missing, skipping")
  238. return 1
  239. else:
  240. os.environ["YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"] = os.environ[
  241. "CI_YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS"
  242. ]
  243. with ydb.Driver(
  244. endpoint=DATABASE_ENDPOINT,
  245. database=DATABASE_PATH,
  246. credentials=ydb.credentials_from_env_variables(),
  247. ) as driver:
  248. driver.wait(timeout=10, fail_fast=True)
  249. tc_settings = ydb.TableClientSettings().with_native_date_in_result_sets(enabled=False)
  250. table_client = ydb.TableClient(driver, tc_settings)
  251. base_date = datetime.datetime(1970, 1, 1)
  252. default_start_date = datetime.date(2024, 8, 1)
  253. today = datetime.date.today()
  254. table_path = f'test_results/analytics/tests_monitor_test_with_filtered_states'
  255. # Get last existing day
  256. print("Geting date of last collected monitor data")
  257. query_last_exist_day = f"""
  258. SELECT MAX(date_window) AS last_exist_day
  259. FROM `{table_path}`
  260. WHERE build_type = '{build_type}'
  261. AND branch = '{branch}'
  262. """
  263. query = ydb.ScanQuery(query_last_exist_day, {})
  264. it = driver.table_client.scan_query(query)
  265. last_exist_day = None
  266. while True:
  267. try:
  268. result = next(it)
  269. last_exist_day = result.result_set.rows[0][
  270. 'last_exist_day'
  271. ] # exclude last day, we want recalculate last day
  272. break
  273. except StopIteration:
  274. break
  275. except Exception as e:
  276. print(f"Error during fetching last existing day: {e}")
  277. last_exist_df = None
  278. last_day_data = None
  279. # If no data exists, set last_exist_day to a default start date
  280. if last_exist_day is None:
  281. last_exist_day = default_start_date
  282. last_exist_day_str = last_exist_day.strftime('%Y-%m-%d')
  283. date_list = [today - datetime.timedelta(days=x) for x in range((today - last_exist_day).days + 1)]
  284. print(f"Monitor data do not exist - init new monitor collecting from default date {last_exist_day_str}")
  285. else:
  286. # Get data from tests_monitor for last existing day
  287. last_exist_day = (base_date + datetime.timedelta(days=last_exist_day)).date()
  288. if last_exist_day == today: # to recalculate data for today
  289. last_exist_day = last_exist_day - datetime.timedelta(days=1)
  290. last_exist_day_str = last_exist_day.strftime('%Y-%m-%d')
  291. print(f"Monitor data exist - geting data for date {last_exist_day_str}")
  292. date_list = [today - datetime.timedelta(days=x) for x in range((today - last_exist_day).days)]
  293. query_last_exist_data = f"""
  294. SELECT *
  295. FROM `{table_path}`
  296. WHERE build_type = '{build_type}'
  297. AND branch = '{branch}'
  298. AND date_window = Date('{last_exist_day_str}')
  299. """
  300. query = ydb.ScanQuery(query_last_exist_data, {})
  301. it = driver.table_client.scan_query(query)
  302. last_exist_data = []
  303. while True:
  304. try:
  305. result = next(it)
  306. # Convert each row to a dictionary with consistent keys
  307. for row in result.result_set.rows:
  308. row_dict = {
  309. 'test_name': row['test_name'],
  310. 'suite_folder': row['suite_folder'],
  311. 'full_name': row['full_name'],
  312. 'date_window': base_date + datetime.timedelta(days=row['date_window']),
  313. 'build_type': row['build_type'],
  314. 'branch': row['branch'],
  315. 'days_ago_window': row['days_ago_window'],
  316. 'history': row['history'],
  317. 'history_class': row['history_class'],
  318. 'pass_count': row['pass_count'],
  319. 'mute_count': row['mute_count'],
  320. 'fail_count': row['fail_count'],
  321. 'skip_count': row['skip_count'],
  322. 'success_rate': row['success_rate'],
  323. 'summary': row['summary'],
  324. 'owners': row['owner'],
  325. 'is_muted': row['is_muted'],
  326. 'is_test_chunk': row['is_test_chunk'],
  327. 'state': row['state'],
  328. 'previous_state': row['previous_state'],
  329. 'state_change_date': base_date + datetime.timedelta(days=row['state_change_date']),
  330. 'days_in_state': row['days_in_state'],
  331. 'previous_state_filtered': row['previous_state_filtered'],
  332. 'state_change_date_filtered': base_date
  333. + datetime.timedelta(days=row['state_change_date_filtered']),
  334. 'days_in_state_filtered': row['days_in_state_filtered'],
  335. 'state_filtered': row['state_filtered'],
  336. }
  337. last_exist_data.append(row_dict)
  338. except StopIteration:
  339. break
  340. last_exist_df = pd.DataFrame(last_exist_data)
  341. # Get data from flaky_tests_window table for dates after last existing day
  342. data = {
  343. 'test_name': [],
  344. 'suite_folder': [],
  345. 'full_name': [],
  346. 'date_window': [],
  347. 'build_type': [],
  348. 'branch': [],
  349. 'owners': [],
  350. 'days_ago_window': [],
  351. 'history': [],
  352. 'history_class': [],
  353. 'pass_count': [],
  354. 'mute_count': [],
  355. 'fail_count': [],
  356. 'skip_count': [],
  357. 'is_muted': [],
  358. }
  359. print(f'Getting aggregated history in window {history_for_n_day} days')
  360. for date in sorted(date_list):
  361. # Query for data from flaky_tests_window with date_window >= last_existing_day
  362. query_get_history = f"""
  363. SELECT
  364. hist.branch AS branch,
  365. hist.build_type AS build_type,
  366. hist.date_window AS date_window,
  367. hist.days_ago_window AS days_ago_window,
  368. hist.fail_count AS fail_count,
  369. hist.full_name AS full_name,
  370. hist.history AS history,
  371. hist.history_class AS history_class,
  372. hist.mute_count AS mute_count,
  373. COALESCE(owners_t.owners, fallback_t.owners) AS owners,
  374. hist.pass_count AS pass_count,
  375. COALESCE(owners_t.run_timestamp_last, NULL) AS run_timestamp_last,
  376. COALESCE(owners_t.is_muted, NULL) AS is_muted,
  377. hist.skip_count AS skip_count,
  378. hist.suite_folder AS suite_folder,
  379. hist.test_name AS test_name
  380. FROM (
  381. SELECT * FROM
  382. `test_results/analytics/flaky_tests_window_{history_for_n_day}_days`
  383. WHERE
  384. date_window = Date('{date}')
  385. AND build_type = '{build_type}'
  386. AND branch = '{branch}'
  387. ) AS hist
  388. LEFT JOIN (
  389. SELECT
  390. test_name,
  391. suite_folder,
  392. owners,
  393. run_timestamp_last,
  394. is_muted,
  395. date
  396. FROM
  397. `test_results/all_tests_with_owner_and_mute`
  398. WHERE
  399. branch = '{branch}'
  400. AND date = Date('{date}')
  401. ) AS owners_t
  402. ON
  403. hist.test_name = owners_t.test_name
  404. AND hist.suite_folder = owners_t.suite_folder
  405. AND hist.date_window = owners_t.date
  406. LEFT JOIN (
  407. SELECT
  408. test_name,
  409. suite_folder,
  410. owners
  411. FROM
  412. `test_results/analytics/testowners`
  413. ) AS fallback_t
  414. ON
  415. hist.test_name = fallback_t.test_name
  416. AND hist.suite_folder = fallback_t.suite_folder
  417. WHERE
  418. owners_t.test_name IS NOT NULL OR fallback_t.test_name IS NOT NULL;
  419. """
  420. query = ydb.ScanQuery(query_get_history, {})
  421. # start transaction time
  422. start_time = time.time()
  423. it = driver.table_client.scan_query(query)
  424. # end transaction time
  425. results = []
  426. prepared_for_update_rows = []
  427. while True:
  428. try:
  429. result = next(it)
  430. results = results + result.result_set.rows
  431. except StopIteration:
  432. break
  433. end_time = time.time()
  434. print(f'Captured raw data for {date} duration: {end_time - start_time}')
  435. start_time = time.time()
  436. # Check if new data was found
  437. if results:
  438. for row in results:
  439. data['test_name'].append(row['test_name'])
  440. data['suite_folder'].append(row['suite_folder'])
  441. data['full_name'].append(row['full_name'])
  442. data['date_window'].append(base_date + datetime.timedelta(days=row['date_window']))
  443. data['build_type'].append(row['build_type'])
  444. data['branch'].append(row['branch'])
  445. data['owners'].append(row['owners'])
  446. data['days_ago_window'].append(row['days_ago_window'])
  447. data['history'].append(
  448. row['history'].decode('utf-8') if isinstance(row['history'], bytes) else row['history']
  449. )
  450. data['history_class'].append(
  451. row['history_class'].decode('utf-8')
  452. if isinstance(row['history_class'], bytes)
  453. else row['history_class']
  454. )
  455. data['pass_count'].append(row['pass_count'])
  456. data['mute_count'].append(row['mute_count'])
  457. data['fail_count'].append(row['fail_count'])
  458. data['skip_count'].append(row['skip_count'])
  459. data['is_muted'].append(row['is_muted'])
  460. else:
  461. print(
  462. f"Warning: No data found in flaky_tests_window for date {date} build_type='{build_type}', branch='{branch}'"
  463. )
  464. start_time = time.time()
  465. df = pd.DataFrame(data)
  466. # **Concatenate DataFrames**
  467. if last_exist_df is not None and last_exist_df.shape[0] > 0:
  468. last_day_data = last_exist_df[
  469. [
  470. 'full_name',
  471. 'days_in_state',
  472. 'state',
  473. 'previous_state',
  474. 'state_change_date',
  475. 'days_in_state_filtered',
  476. 'state_change_date_filtered',
  477. 'previous_state_filtered',
  478. 'state_filtered',
  479. ]
  480. ]
  481. end_time = time.time()
  482. print(f'Dataframe inited: {end_time - start_time}')
  483. tart_time = time.time()
  484. df = df.sort_values(by=['full_name', 'date_window'])
  485. end_time = time.time()
  486. print(f'Dataframe sorted: {end_time - start_time}')
  487. start_time = time.time()
  488. df['success_rate'] = df.apply(calculate_success_rate, axis=1).astype(int)
  489. df['summary'] = df.apply(calculate_summary, axis=1)
  490. df['owner'] = df['owners'].apply(compute_owner)
  491. df['is_test_chunk'] = df['full_name'].str.contains('chunk chunk|chunk\+chunk', regex=True).astype(int)
  492. df['is_muted'] = df['is_muted'].fillna(0).astype(int)
  493. df['success_rate'].astype(int)
  494. df['state'] = df.apply(determine_state, axis=1)
  495. end_time = time.time()
  496. print(f'Computed base params: {end_time - start_time}')
  497. start_time = time.time()
  498. if concurrent_mode:
  499. with Pool(processes=cpu_count()) as pool:
  500. results = pool.starmap(
  501. process_test_group,
  502. [(name, group, last_day_data, default_start_date) for name, group in df.groupby('full_name')],
  503. )
  504. end_time = time.time()
  505. print(
  506. f'Computed days_in_state, state_change_date, previous_state and other params: {end_time - start_time}'
  507. )
  508. start_time = time.time()
  509. # Apply results to the DataFrame
  510. for i, (name, group) in enumerate(df.groupby('full_name')):
  511. df.loc[group.index, 'previous_state'] = results[i]['previous_state']
  512. df.loc[group.index, 'state_change_date'] = results[i]['state_change_date']
  513. df.loc[group.index, 'days_in_state'] = results[i]['days_in_state']
  514. df.loc[group.index, 'previous_state_filtered'] = results[i]['previous_state_filtered']
  515. df.loc[group.index, 'state_change_date_filtered'] = results[i]['state_change_date_filtered']
  516. df.loc[group.index, 'days_in_state_filtered'] = results[i]['days_in_state_filtered']
  517. df.loc[group.index, 'state_filtered'] = results[i]['state_filtered']
  518. else:
  519. previous_state_list = []
  520. state_change_date_list = []
  521. days_in_state_list = []
  522. previous_state_filtered_list = []
  523. state_change_date_filtered_list = []
  524. days_in_state_filtered_list = []
  525. state_filtered_list = []
  526. for name, group in df.groupby('full_name'):
  527. result = process_test_group(name, group, last_day_data, default_start_date)
  528. previous_state_list = previous_state_list + result['previous_state']
  529. state_change_date_list = state_change_date_list + result['state_change_date']
  530. days_in_state_list = days_in_state_list + result['days_in_state']
  531. previous_state_filtered_list = previous_state_filtered_list + result['previous_state_filtered']
  532. state_change_date_filtered_list = state_change_date_filtered_list + result['state_change_date_filtered']
  533. days_in_state_filtered_list = days_in_state_filtered_list + result['days_in_state_filtered']
  534. state_filtered_list = state_filtered_list + result['state_filtered']
  535. end_time = time.time()
  536. print(
  537. f'Computed days_in_state, state_change_date, previous_state and other params: {end_time - start_time}'
  538. )
  539. start_time = time.time()
  540. # Apply results to the DataFrame
  541. df['previous_state'] = previous_state_list
  542. df['state_change_date'] = state_change_date_list
  543. df['days_in_state'] = days_in_state_list
  544. df['previous_state_filtered'] = previous_state_filtered_list
  545. df['state_change_date_filtered'] = state_change_date_filtered_list
  546. df['days_in_state_filtered'] = days_in_state_filtered_list
  547. df['state_filtered'] = state_filtered_list
  548. end_time = time.time()
  549. print(f'Saving computed result in dataframe: {end_time - start_time}')
  550. start_time = time.time()
  551. df['state_change_date'] = df['state_change_date'].dt.date
  552. df['date_window'] = df['date_window'].dt.date
  553. df['days_in_state'] = df['days_in_state'].astype(int)
  554. df['state_change_date_filtered'] = df['state_change_date_filtered'].dt.date
  555. df['days_in_state_filtered'] = df['days_in_state_filtered'].astype(int)
  556. end_time = time.time()
  557. print(f'Converting types of columns: {end_time - start_time}')
  558. start_time = time.time()
  559. result = df[
  560. [
  561. 'full_name',
  562. 'date_window',
  563. 'suite_folder',
  564. 'test_name',
  565. 'days_ago_window',
  566. 'build_type',
  567. 'branch',
  568. 'history',
  569. 'history_class',
  570. 'pass_count',
  571. 'mute_count',
  572. 'fail_count',
  573. 'skip_count',
  574. 'summary',
  575. 'owner',
  576. 'is_test_chunk',
  577. 'is_muted',
  578. 'state',
  579. 'previous_state',
  580. 'state_change_date',
  581. 'days_in_state',
  582. 'previous_state_filtered',
  583. 'state_change_date_filtered',
  584. 'days_in_state_filtered',
  585. 'state_filtered',
  586. 'success_rate',
  587. ]
  588. ]
  589. end_time = time.time()
  590. print(f'Dataframe prepared {end_time - start_time}')
  591. print(f'Data collected, {len(result)} rows')
  592. start_time = time.time()
  593. prepared_for_update_rows = result.to_dict('records')
  594. end_time = time.time()
  595. print(f'Data converted to dict for upsert: {end_time - start_time}')
  596. start_upsert_time = time.time()
  597. with ydb.SessionPool(driver) as pool:
  598. create_tables(pool, table_path)
  599. full_path = posixpath.join(DATABASE_PATH, table_path)
  600. def chunk_data(data, chunk_size):
  601. for i in range(0, len(data), chunk_size):
  602. yield data[i : i + chunk_size]
  603. chunk_size = 40000
  604. total_chunks = len(prepared_for_update_rows) // chunk_size + (
  605. 1 if len(prepared_for_update_rows) % chunk_size != 0 else 0
  606. )
  607. for i, chunk in enumerate(chunk_data(prepared_for_update_rows, chunk_size), start=1):
  608. start_time = time.time()
  609. print(f"Uploading chunk {i}/{total_chunks}")
  610. bulk_upsert(driver.table_client, full_path, chunk)
  611. end_time = time.time()
  612. print(f'upsert for: {end_time - start_time} ')
  613. end_time = time.time()
  614. print(f'monitor data upserted: {end_time - start_upsert_time}')
  615. if __name__ == "__main__":
  616. main()