tools.py 1.2 KB

12345678910111213141516171819202122232425262728
  1. from typing import Optional, Sequence, Dict, Any
  2. from clickhouse_connect.driver import Client
  3. from clickhouse_connect.driver.summary import QuerySummary
  4. from clickhouse_connect.driver.query import quote_identifier
  5. def insert_file(client: Client,
  6. table: str,
  7. file_path: str,
  8. fmt: Optional[str] = None,
  9. column_names: Optional[Sequence[str]] = None,
  10. database: Optional[str] = None,
  11. settings: Optional[Dict[str, Any]] = None,
  12. compression: Optional[str] = None) -> QuerySummary:
  13. full_table = f'{quote_identifier(database)}.{quote_identifier(table)}' if database else quote_identifier(table)
  14. if not fmt:
  15. fmt = 'CSV' if column_names else 'CSVWithNames'
  16. if compression is None:
  17. if file_path.endswith('.gzip') or file_path.endswith('.gz'):
  18. compression = 'gzip'
  19. with open(file_path, 'rb') as file:
  20. return client.raw_insert(full_table,
  21. column_names=column_names,
  22. insert_block=file,
  23. fmt=fmt,
  24. settings=settings,
  25. compression=compression)