testing.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from typing import Sequence, Optional, Union, Dict, Any
  2. from clickhouse_connect.driver import Client
  3. from clickhouse_connect.driver.query import format_query_value, quote_identifier
  4. class TableContext:
  5. def __init__(self, client: Client,
  6. table: str,
  7. columns: Union[str, Sequence[str]],
  8. column_types: Optional[Sequence[str]] = None,
  9. engine: str = 'MergeTree',
  10. order_by: str = None,
  11. settings: Optional[Dict[str, Any]] = None):
  12. self.client = client
  13. self.table = table
  14. self.settings = settings
  15. if isinstance(columns, str):
  16. columns = columns.split(',')
  17. if column_types is None:
  18. self.column_names = []
  19. self.column_types = []
  20. for col in columns:
  21. col = col.strip()
  22. ix = col.find(' ')
  23. self.column_types.append(col[ix + 1:].strip())
  24. self.column_names.append(col[:ix].strip())
  25. else:
  26. self.column_names = columns
  27. self.column_types = column_types
  28. self.engine = engine
  29. self.order_by = quote_identifier(self.column_names[0]) if order_by is None else order_by
  30. def __enter__(self):
  31. if self.client.min_version('19'):
  32. self.client.command(f'DROP TABLE IF EXISTS {self.table}')
  33. else:
  34. self.client.command(f'DROP TABLE IF EXISTS {self.table} SYNC')
  35. col_defs = ','.join(f'{quote_identifier(name)} {col_type}' for name, col_type in zip(self.column_names, self.column_types))
  36. create_cmd = f'CREATE TABLE {self.table} ({col_defs}) ENGINE {self.engine} ORDER BY {self.order_by}'
  37. if self.settings:
  38. create_cmd += ' SETTINGS '
  39. for key, value in self.settings.items():
  40. create_cmd += f'{key} = {format_query_value(value)}, '
  41. if create_cmd.endswith(', '):
  42. create_cmd = create_cmd[:-2]
  43. self.client.command(create_cmd)
  44. return self
  45. def __exit__(self, exc_type, exc_val, exc_tb):
  46. self.client.command(f'DROP TABLE IF EXISTS {self.table}')