123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- import sqlalchemy.schema as sa_schema
- from sqlalchemy.engine.reflection import Inspector
- from sqlalchemy.orm.exc import NoResultFound
- from clickhouse_connect.cc_sqlalchemy.datatypes.base import sqla_type_from_name
- from clickhouse_connect.cc_sqlalchemy.ddl.tableengine import build_engine
- from clickhouse_connect.cc_sqlalchemy.sql import full_table
- from clickhouse_connect.cc_sqlalchemy import dialect_name as dn
- ch_col_args = ('default_type', 'codec_expression', 'ttl_expression')
- def get_engine(connection, table_name, schema=None):
- result_set = connection.execute(
- f"SELECT engine_full FROM system.tables WHERE database = '{schema}' and name = '{table_name}'")
- row = next(result_set, None)
- if not row:
- raise NoResultFound(f'Table {schema}.{table_name} does not exist')
- return build_engine(row.engine_full)
- class ChInspector(Inspector):
- def reflect_table(self, table, include_columns, exclude_columns, *_args, **_kwargs):
- schema = table.schema
- for col in self.get_columns(table.name, schema):
- name = col.pop('name')
- if (include_columns and name not in include_columns) or (exclude_columns and name in exclude_columns):
- continue
- col_type = col.pop('type')
- col_args = {f'{dn}_{key}' if key in ch_col_args else key: value for key, value in col.items() if value}
- table.append_column(sa_schema.Column(name, col_type, **col_args))
- table.engine = get_engine(self.bind, table.name, schema)
- def get_columns(self, table_name, schema=None, **_kwargs):
- table_id = full_table(table_name, schema)
- result_set = self.bind.execute(f'DESCRIBE TABLE {table_id}')
- if not result_set:
- raise NoResultFound(f'Table {full_table} does not exist')
- columns = []
- for row in result_set:
- sqla_type = sqla_type_from_name(row.type.replace('\n', ''))
- col = {'name': row.name,
- 'type': sqla_type,
- 'nullable': sqla_type.nullable,
- 'autoincrement': False,
- 'default': row.default_expression,
- 'default_type': row.default_type,
- 'comment': row.comment,
- 'codec_expression': row.codec_expression,
- 'ttl_expression': row.ttl_expression}
- columns.append(col)
- return columns
- ChInspector.reflecttable = ChInspector.reflect_table # Hack to provide backward compatibility for SQLAlchemy 1.3
|