inspector.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import sqlalchemy.schema as sa_schema
  2. from sqlalchemy.engine.reflection import Inspector
  3. from sqlalchemy.orm.exc import NoResultFound
  4. from clickhouse_connect.cc_sqlalchemy.datatypes.base import sqla_type_from_name
  5. from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import build_engine
  6. from clickhouse_connect.cc_sqlalchemy.sql import full_table
  7. from clickhouse_connect.cc_sqlalchemy import dialect_name as dn
  8. ch_col_args = ('default_type', 'codec_expression', 'ttl_expression')
  9. def get_engine(connection, table_name, schema=None):
  10. result_set = connection.execute(
  11. f"SELECT engine_full FROM system.tables WHERE database = '{schema}' and name = '{table_name}'")
  12. row = next(result_set, None)
  13. if not row:
  14. raise NoResultFound(f'Table {schema}.{table_name} does not exist')
  15. return build_engine(row.engine_full)
  16. class ChInspector(Inspector):
  17. def reflect_table(self, table, include_columns, exclude_columns, *_args, **_kwargs):
  18. schema = table.schema
  19. for col in self.get_columns(table.name, schema):
  20. name = col.pop('name')
  21. if (include_columns and name not in include_columns) or (exclude_columns and name in exclude_columns):
  22. continue
  23. col_type = col.pop('type')
  24. col_args = {f'{dn}_{key}' if key in ch_col_args else key: value for key, value in col.items() if value}
  25. table.append_column(sa_schema.Column(name, col_type, **col_args))
  26. table.engine = get_engine(self.bind, table.name, schema)
  27. def get_columns(self, table_name, schema=None, **_kwargs):
  28. table_id = full_table(table_name, schema)
  29. result_set = self.bind.execute(f'DESCRIBE TABLE {table_id}')
  30. if not result_set:
  31. raise NoResultFound(f'Table {full_table} does not exist')
  32. columns = []
  33. for row in result_set:
  34. sqla_type = sqla_type_from_name(row.type.replace('\n', ''))
  35. col = {'name': row.name,
  36. 'type': sqla_type,
  37. 'nullable': sqla_type.nullable,
  38. 'autoincrement': False,
  39. 'default': row.default_expression,
  40. 'default_type': row.default_type,
  41. 'comment': row.comment,
  42. 'codec_expression': row.codec_expression,
  43. 'ttl_expression': row.ttl_expression}
  44. columns.append(col)
  45. return columns
  46. ChInspector.reflecttable = ChInspector.reflect_table # Hack to provide backward compatibility for SQLAlchemy 1.3