tools.py 1.4 KB

123456789101112131415161718192021222324252627282930313233
  1. from typing import Optional, Sequence, Dict, Any
  2. from clickhouse_connect.driver import Client
  3. from clickhouse_connect.driver.summary import QuerySummary
  4. from clickhouse_connect.driver.query import quote_identifier
  5. def insert_file(client: Client,
  6. table: str,
  7. file_path: str,
  8. fmt: Optional[str] = None,
  9. column_names: Optional[Sequence[str]] = None,
  10. database: Optional[str] = None,
  11. settings: Optional[Dict[str, Any]] = None,
  12. compression: Optional[str] = None) -> QuerySummary:
  13. if not database and table[0] not in ('`', "'") and table.find('.') > 0:
  14. full_table = table
  15. elif database:
  16. full_table = f'{quote_identifier(database)}.{quote_identifier(table)}'
  17. else:
  18. full_table = quote_identifier(table)
  19. if not fmt:
  20. fmt = 'CSV' if column_names else 'CSVWithNames'
  21. if compression is None:
  22. if file_path.endswith('.gzip') or file_path.endswith('.gz'):
  23. compression = 'gzip'
  24. with open(file_path, 'rb') as file:
  25. return client.raw_insert(full_table,
  26. column_names=column_names,
  27. insert_block=file,
  28. fmt=fmt,
  29. settings=settings,
  30. compression=compression)