external.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import logging
  2. from typing import Optional, Sequence, Dict, Union
  3. from pathlib import Path
  4. from clickhouse_connect.driver.exceptions import ProgrammingError
  5. logger = logging.getLogger(__name__)
  6. class ExternalFile:
  7. # pylint: disable=too-many-branches
  8. def __init__(self,
  9. file_path: Optional[str] = None,
  10. file_name: Optional[str] = None,
  11. data: Optional[bytes] = None,
  12. fmt: Optional[str] = None,
  13. types: Optional[Union[str, Sequence[str]]] = None,
  14. structure: Optional[Union[str, Sequence[str]]] = None,
  15. mime_type: Optional[str] = None):
  16. if file_path:
  17. if data:
  18. raise ProgrammingError('Only data or file_path should be specified for external data, not both')
  19. try:
  20. with open(file_path, 'rb') as file:
  21. self.data = file.read()
  22. except OSError as ex:
  23. raise ProgrammingError(f'Failed to open file {file_path} for external data') from ex
  24. path_name = Path(file_path).name
  25. path_base = path_name.rsplit('.', maxsplit=1)[0]
  26. if not file_name:
  27. self.name = path_base
  28. self.file_name = path_name
  29. else:
  30. self.name = file_name.rsplit('.', maxsplit=1)[0]
  31. self.file_name = file_name
  32. if file_name != path_name and path_base != self.name:
  33. logger.warning('External data name %s and file_path %s use different names', file_name, path_name)
  34. elif data:
  35. if not file_name:
  36. raise ProgrammingError('Name is required for query external data')
  37. self.data = data
  38. self.name = file_name.rsplit('.', maxsplit=1)[0]
  39. self.file_name = file_name
  40. else:
  41. raise ProgrammingError('Either data or file_path must be specified for external data')
  42. if types:
  43. if structure:
  44. raise ProgrammingError('Only types or structure should be specified for external data, not both')
  45. self.structure = None
  46. if isinstance(types, str):
  47. self.types = types
  48. else:
  49. self.types = ','.join(types)
  50. elif structure:
  51. self.types = None
  52. if isinstance(structure, str):
  53. self.structure = structure
  54. else:
  55. self.structure = ','.join(structure)
  56. self.fmt = fmt
  57. self.mime_type = mime_type or 'application/octet-stream'
  58. @property
  59. def form_data(self) -> tuple:
  60. return self.file_name, self.data, self.mime_type
  61. @property
  62. def query_params(self) -> Dict[str, str]:
  63. params = {}
  64. for name, value in (('format', self.fmt),
  65. ('structure', self.structure),
  66. ('types', self.types)):
  67. if value:
  68. params[f'{self.name}_{name}'] = value
  69. return params
  70. class ExternalData:
  71. def __init__(self,
  72. file_path: Optional[str] = None,
  73. file_name: Optional[str] = None,
  74. data: Optional[bytes] = None,
  75. fmt: Optional[str] = None,
  76. types: Optional[Union[str, Sequence[str]]] = None,
  77. structure: Optional[Union[str, Sequence[str]]] = None,
  78. mime_type: Optional[str] = None):
  79. self.files: list[ExternalFile] = []
  80. if file_path or data:
  81. first_file = ExternalFile(file_path=file_path,
  82. file_name=file_name,
  83. data=data,
  84. fmt=fmt,
  85. types=types,
  86. structure=structure,
  87. mime_type=mime_type)
  88. self.files.append(first_file)
  89. def add_file(self,
  90. file_path: Optional[str] = None,
  91. file_name: Optional[str] = None,
  92. data: Optional[bytes] = None,
  93. fmt: Optional[str] = None,
  94. types: Optional[Union[str, Sequence[str]]] = None,
  95. structure: Optional[Union[str, Sequence[str]]] = None,
  96. mime_type: Optional[str] = None):
  97. self.files.append(ExternalFile(file_path=file_path,
  98. file_name=file_name,
  99. data=data,
  100. fmt=fmt,
  101. types=types,
  102. structure=structure,
  103. mime_type=mime_type))
  104. @property
  105. def form_data(self) -> Dict[str, tuple]:
  106. if not self.files:
  107. raise ProgrammingError('No external files set for external data')
  108. return {file.name: file.form_data for file in self.files}
  109. @property
  110. def query_params(self) -> Dict[str, str]:
  111. if not self.files:
  112. raise ProgrammingError('No external files set for external data')
  113. params = {}
  114. for file in self.files:
  115. params.update(file.query_params)
  116. return params