tableengine.py 7.8 KB


  1. import logging
  2. from typing import Type, Sequence, Optional, Dict
  3. from sqlalchemy.exc import ArgumentError, SQLAlchemyError
  4. from sqlalchemy.sql.base import SchemaEventTarget
  5. from sqlalchemy.sql.visitors import Visitable
  6. logger = logging.getLogger(__name__)
  7. engine_map: Dict[str, Type['TableEngine']] = {}
  8. def tuple_expr(expr_name, value):
  9. """
  10. Create a table parameter with a tuple or list correctly formatted
  11. :param expr_name: parameter
  12. :param value: string or tuple of strings to format
  13. :return: formatted parameter string
  14. """
  15. if value is None:
  16. return ''
  17. v = f'{expr_name.strip()}'
  18. if isinstance(value, (tuple, list)):
  19. return f" {v} ({','.join(value)})"
  20. return f'{v} {value}'
  21. class TableEngine(SchemaEventTarget, Visitable):
  22. """
  23. SqlAlchemy Schema element to support ClickHouse table engines. At the moment provides no real
  24. functionality other than the CREATE TABLE argument string
  25. """
  26. arg_names = ()
  27. quoted_args = set()
  28. optional_args = set()
  29. eng_params = ()
  30. def __init_subclass__(cls, **kwargs):
  31. engine_map[cls.__name__] = cls
  32. def __init__(self, kwargs):
  33. # pylint: disable=no-value-for-parameter
  34. Visitable.__init__(self)
  35. self.name = self.__class__.__name__
  36. te_name = f'{self.name} Table Engine'
  37. engine_args = []
  38. for arg_name in self.arg_names:
  39. v = kwargs.pop(arg_name, None)
  40. if v is None:
  41. if arg_name in self.optional_args:
  42. continue
  43. raise ValueError(f'Required engine parameter {arg_name} not provided for {te_name}')
  44. if arg_name in self.quoted_args:
  45. engine_args.append(f"'{v}'")
  46. else:
  47. engine_args.append(v)
  48. if engine_args:
  49. self.arg_str = f'({", ".join(engine_args)})'
  50. params = []
  51. for param_name in self.eng_params:
  52. v = kwargs.pop(param_name, None)
  53. if v is not None:
  54. params.append(tuple_expr(param_name.upper().replace('_', ' '), v))
  55. self.full_engine = 'Engine ' + self.name
  56. if engine_args:
  57. self.full_engine += f'({", ".join(engine_args)})'
  58. if params:
  59. self.full_engine += ' ' + ' '.join(params)
  60. def compile(self):
  61. return self.full_engine
  62. def check_primary_keys(self, primary_keys: Sequence):
  63. raise SQLAlchemyError(f'Table Engine {self.name} does not support primary keys')
  64. def _set_parent(self, parent, **_kwargs):
  65. parent.engine = self
  66. class Memory(TableEngine):
  67. pass
  68. class Log(TableEngine):
  69. pass
  70. class StripeLog(TableEngine):
  71. pass
  72. class TinyLog(TableEngine):
  73. pass
  74. class Null(TableEngine):
  75. pass
  76. class Set(TableEngine):
  77. pass
  78. class Dictionary(TableEngine):
  79. arg_names = ['dictionary']
  80. # pylint: disable=unused-argument
  81. def __init__(self, dictionary: str = None):
  82. super().__init__(locals())
  83. class Merge(TableEngine):
  84. arg_names = ['db_name, tables_regexp']
  85. # pylint: disable=unused-argument
  86. def __init__(self, db_name: str = None, tables_regexp: str = None):
  87. super().__init__(locals())
  88. class File(TableEngine):
  89. arg_names = ['fmt']
  90. # pylint: disable=unused-argument
  91. def __init__(self, fmt: str = None):
  92. super().__init__(locals())
  93. class Distributed(TableEngine):
  94. arg_names = ['cluster', 'database', 'table', 'sharding_key', 'policy_name']
  95. optional_args = {'sharding_key', 'policy_name'}
  96. # pylint: disable=unused-argument
  97. def __init__(self, cluster: str = None, database: str = None, table=None,
  98. sharding_key: str = None, policy_name: str = None):
  99. super().__init__(locals())
  100. class MergeTree(TableEngine):
  101. eng_params = ['order_by', 'partition_key', 'primary_key', 'sample_by']
  102. # pylint: disable=unused-argument
  103. def __init__(self, order_by: str = None, primary_key: str = None,
  104. partition_by: str = None, sample_by: str = None):
  105. if not order_by and not primary_key:
  106. raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified')
  107. super().__init__(locals())
  108. class SharedMergeTree(MergeTree):
  109. pass
  110. class SummingMergeTree(MergeTree):
  111. pass
  112. class AggregatingMergeTree(MergeTree):
  113. pass
  114. class ReplacingMergeTree(TableEngine):
  115. arg_names = ['ver']
  116. optional_args = set(arg_names)
  117. eng_params = MergeTree.eng_params
  118. # pylint: disable=unused-argument
  119. def __init__(self, ver: str = None, order_by: str = None, primary_key: str = None,
  120. partition_by: str = None, sample_by: str = None):
  121. if not order_by and not primary_key:
  122. raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified')
  123. super().__init__(locals())
  124. class CollapsingMergeTree(TableEngine):
  125. arg_names = ['sign']
  126. eng_params = MergeTree.eng_params
  127. # pylint: disable=unused-argument
  128. def __init__(self, sign: str = None, order_by: str = None, primary_key: str = None,
  129. partition_by: str = None, sample_by: str = None):
  130. if not order_by and not primary_key:
  131. raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified')
  132. super().__init__(locals())
  133. class VersionedCollapsingMergeTree(TableEngine):
  134. arg_names = ['sign', 'version']
  135. eng_params = MergeTree.eng_params
  136. # pylint: disable=unused-argument
  137. def __init__(self, sign: str = None, version: str = None, order_by: str = None, primary_key: str = None,
  138. partition_by: str = None, sample_by: str = None):
  139. if not order_by and not primary_key:
  140. raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified')
  141. super().__init__(locals())
  142. class GraphiteMergeTree(TableEngine):
  143. arg_names = ['config_section']
  144. eng_params = MergeTree.eng_params
  145. # pylint: disable=unused-argument
  146. def __init__(self, config_section: str = None, version: str = None, order_by: str = None, primary_key: str = None,
  147. partition_by: str = None, sample_by: str = None):
  148. if not order_by and not primary_key:
  149. raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified')
  150. super().__init__(locals())
  151. class ReplicatedMergeTree(TableEngine):
  152. arg_names = ['zk_path', 'replica']
  153. quoted_args = set(arg_names)
  154. optional_args = quoted_args
  155. eng_params = MergeTree.eng_params
  156. # pylint: disable=unused-argument
  157. def __init__(self, order_by: str = None, primary_key: str = None, partition_by: str = None, sample_by: str = None,
  158. zk_path: str = None, replica: str = None):
  159. if not order_by and not primary_key:
  160. raise ArgumentError(None, 'Either PRIMARY KEY or ORDER BY must be specified')
  161. super().__init__(locals())
  162. class ReplicatedAggregatingMergeTree(ReplicatedMergeTree):
  163. pass
  164. class ReplicatedSummingMergeTree(ReplicatedMergeTree):
  165. pass
  166. class SharedReplacingMergeTree(ReplacingMergeTree):
  167. pass
  168. class SharedAggregatingMergeTree(AggregatingMergeTree):
  169. pass
  170. class SharedSummingMergeTree(SummingMergeTree):
  171. pass
  172. class SharedVersionedCollapsingMergeTree(VersionedCollapsingMergeTree):
  173. pass
  174. class SharedGraphiteMergeTree(GraphiteMergeTree):
  175. pass
  176. def build_engine(full_engine: str) -> Optional[TableEngine]:
  177. """
  178. Factory function to create TableEngine class from ClickHouse full_engine expression
  179. :param full_engine
  180. :return: TableEngine DDL element
  181. """
  182. if not full_engine:
  183. return None
  184. name = full_engine.split(' ')[0].split('(')[0]
  185. try:
  186. engine_cls = engine_map[name]
  187. except KeyError:
  188. if not name.startswith('System'):
  189. logger.warning('Engine %s not found', name)
  190. return None
  191. engine = engine_cls.__new__(engine_cls)
  192. engine.name = name
  193. engine.full_engine = full_engine
  194. return engine