import.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. #include "import.h"
  2. #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
  3. #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
  4. #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
  5. #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
  6. #include <ydb/public/sdk/cpp/client/ydb_table/table.h>
  7. #include <ydb/public/api/protos/ydb_formats.pb.h>
  8. #include <ydb/public/api/protos/ydb_table.pb.h>
  9. #include <ydb/public/lib/json_value/ydb_json_value.h>
  10. #include <ydb/public/lib/ydb_cli/common/csv_parser.h>
  11. #include <ydb/public/lib/ydb_cli/common/recursive_list.h>
  12. #include <ydb/public/lib/ydb_cli/common/interactive.h>
  13. #include <ydb/public/lib/ydb_cli/common/progress_bar.h>
  14. #include <ydb/public/lib/ydb_cli/dump/util/util.h>
  15. #include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
  16. #include <library/cpp/string_utils/csv/csv.h>
  17. #include <library/cpp/threading/future/async.h>
  18. #include <util/folder/path.h>
  19. #include <util/generic/vector.h>
  20. #include <util/stream/file.h>
  21. #include <util/stream/length.h>
  22. #include <util/string/builder.h>
  23. #include <util/system/thread.h>
  24. #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
  25. #include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
  26. #include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/api.h>
  27. #include <contrib/libs/apache/arrow/cpp/src/arrow/result.h>
  28. #include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
  29. #include <contrib/libs/apache/arrow/cpp/src/parquet/arrow/reader.h>
  30. #include <contrib/libs/apache/arrow/cpp/src/parquet/file_reader.h>
  31. #include <stack>
  32. #if defined(_win32_)
  33. #include <windows.h>
  34. #include <io.h>
  35. #elif defined(_unix_)
  36. #include <unistd.h>
  37. #endif
  38. namespace NYdb {
  39. namespace NConsoleClient {
  40. namespace {
  41. inline
  42. TStatus MakeStatus(EStatus code = EStatus::SUCCESS, const TString& error = {}) {
  43. NYql::TIssues issues;
  44. if (error) {
  45. issues.AddIssue(NYql::TIssue(error));
  46. }
  47. return TStatus(code, std::move(issues));
  48. }
  49. TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFlightRequests) {
  50. while (!inFlightRequests.empty() && inFlightRequests.size() >= maxQueueSize) {
  51. NThreading::WaitAny(inFlightRequests).Wait();
  52. ui32 delta = 0;
  53. for (ui32 i = 0; i + delta < inFlightRequests.size();) {
  54. if (inFlightRequests[i].HasValue() || inFlightRequests[i].HasException()) {
  55. auto status = inFlightRequests[i].ExtractValueSync();
  56. if (!status.IsSuccess()) {
  57. return status;
  58. }
  59. ++delta;
  60. inFlightRequests[i] = inFlightRequests[inFlightRequests.size() - delta];
  61. } else {
  62. ++i;
  63. }
  64. }
  65. inFlightRequests.resize(inFlightRequests.size() - delta);
  66. }
  67. return MakeStatus();
  68. }
  69. void InitCsvParser(TCsvParser& parser,
  70. bool& removeLastDelimiter,
  71. TString&& defaultHeader,
  72. const TImportFileSettings& settings,
  73. const std::map<TString, TType>* columnTypes,
  74. const NTable::TTableDescription* dbTableInfo) {
  75. if (settings.Header_ || settings.HeaderRow_) {
  76. TString headerRow;
  77. if (settings.Header_) {
  78. headerRow = std::move(defaultHeader);
  79. }
  80. if (settings.HeaderRow_) {
  81. headerRow = settings.HeaderRow_;
  82. }
  83. if (headerRow.EndsWith("\r\n")) {
  84. headerRow.erase(headerRow.Size() - 2);
  85. }
  86. if (headerRow.EndsWith("\n")) {
  87. headerRow.erase(headerRow.Size() - 1);
  88. }
  89. if (headerRow.EndsWith(settings.Delimiter_)) {
  90. removeLastDelimiter = true;
  91. headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
  92. }
  93. parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes);
  94. return;
  95. }
  96. TVector<TString> columns;
  97. Y_ENSURE_BT(dbTableInfo);
  98. for (const auto& column : dbTableInfo->GetColumns()) {
  99. columns.push_back(column.Name);
  100. }
  101. parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes);
  102. return;
  103. }
  104. FHANDLE GetStdinFileno() {
  105. #if defined(_win32_)
  106. return GetStdHandle(STD_INPUT_HANDLE);
  107. #elif defined(_unix_)
  108. return STDIN_FILENO;
  109. #endif
  110. }
  111. class TMaxInflightGetter {
  112. public:
  113. TMaxInflightGetter(ui64 totalMaxInFlight, std::atomic<ui64>& filesCount)
  114. : TotalMaxInFlight(totalMaxInFlight)
  115. , FilesCount(filesCount) {
  116. }
  117. ~TMaxInflightGetter() {
  118. --FilesCount;
  119. }
  120. ui64 GetCurrentMaxInflight() const {
  121. return (TotalMaxInFlight - 1) / FilesCount + 1; // round up
  122. }
  123. private:
  124. ui64 TotalMaxInFlight;
  125. std::atomic<ui64>& FilesCount;
  126. };
  127. class TCsvFileReader {
  128. private:
  129. class TFileChunk {
  130. public:
  131. TFileChunk(TFile file, THolder<IInputStream>&& stream, ui64 size = std::numeric_limits<ui64>::max())
  132. : File(file)
  133. , Stream(std::move(stream))
  134. , CountStream(MakeHolder<TCountingInput>(Stream.Get()))
  135. , Size(size) {
  136. }
  137. bool ConsumeLine(TString& line) {
  138. ui64 prevCount = CountStream->Counter();
  139. line = NCsvFormat::TLinesSplitter(*CountStream).ConsumeLine();
  140. if (prevCount == CountStream->Counter() || prevCount >= Size) {
  141. return false;
  142. }
  143. return true;
  144. }
  145. ui64 GetReadCount() const {
  146. return CountStream->Counter();
  147. }
  148. private:
  149. TFile File;
  150. THolder<IInputStream> Stream;
  151. THolder<TCountingInput> CountStream;
  152. ui64 Size;
  153. };
  154. public:
  155. TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow,
  156. TMaxInflightGetter& inFlightGetter) {
  157. TFile file;
  158. if (filePath) {
  159. file = TFile(filePath, RdOnly);
  160. } else {
  161. file = TFile(GetStdinFileno());
  162. }
  163. auto input = MakeHolder<TFileInput>(file);
  164. TCountingInput countInput(input.Get());
  165. if (settings.Header_) {
  166. headerRow = NCsvFormat::TLinesSplitter(countInput).ConsumeLine();
  167. }
  168. for (ui32 i = 0; i < settings.SkipRows_; ++i) {
  169. NCsvFormat::TLinesSplitter(countInput).ConsumeLine();
  170. }
  171. i64 skipSize = countInput.Counter();
  172. MaxInFlight = inFlightGetter.GetCurrentMaxInflight();
  173. i64 fileSize = file.GetLength();
  174. if (filePath.empty() || fileSize == -1) {
  175. SplitCount = 1;
  176. Chunks.emplace_back(file, std::move(input));
  177. return;
  178. }
  179. SplitCount = Min(MaxInFlight, (fileSize - skipSize) / settings.BytesPerRequest_ + 1);
  180. i64 chunkSize = (fileSize - skipSize) / SplitCount;
  181. if (chunkSize == 0) {
  182. SplitCount = 1;
  183. chunkSize = fileSize - skipSize;
  184. }
  185. i64 curPos = skipSize;
  186. i64 seekPos = skipSize;
  187. Chunks.reserve(SplitCount);
  188. TString temp;
  189. file = TFile(filePath, RdOnly);
  190. file.Seek(seekPos, sSet);
  191. THolder<TFileInput> stream = MakeHolder<TFileInput>(file);
  192. for (size_t i = 0; i < SplitCount; ++i) {
  193. seekPos += chunkSize;
  194. i64 nextPos = seekPos;
  195. auto nextFile = TFile(filePath, RdOnly);
  196. auto nextStream = MakeHolder<TFileInput>(nextFile);
  197. nextFile.Seek(seekPos, sSet);
  198. if (seekPos > 0) {
  199. nextFile.Seek(-1, sCur);
  200. nextPos += nextStream->ReadLine(temp);
  201. }
  202. Chunks.emplace_back(file, std::move(stream), nextPos - curPos);
  203. file = std::move(nextFile);
  204. stream = std::move(nextStream);
  205. curPos = nextPos;
  206. }
  207. }
  208. TFileChunk& GetChunk(size_t threadId) {
  209. if (threadId >= Chunks.size()) {
  210. throw yexception() << "File chunk number is too big";
  211. }
  212. return Chunks[threadId];
  213. }
  214. size_t GetThreadLimit(size_t thread_id) const {
  215. return MaxInFlight / SplitCount + (thread_id < MaxInFlight % SplitCount ? 1 : 0);
  216. }
  217. size_t GetSplitCount() const {
  218. return SplitCount;
  219. }
  220. private:
  221. TVector<TFileChunk> Chunks;
  222. size_t SplitCount;
  223. size_t MaxInFlight;
  224. };
  225. } // namespace
  226. TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
  227. : TableClient(std::make_shared<NTable::TTableClient>(driver))
  228. , SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
  229. {
  230. RetrySettings
  231. .MaxRetries(TImportFileSettings::MaxRetries)
  232. .Idempotent(true)
  233. .Verbose(rootConfig.IsVerbose());
  234. }
  235. TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TString& dbPath, const TImportFileSettings& settings) {
  236. FilesCount = filePaths.size();
  237. if (settings.Format_ == EOutputFormat::Tsv && settings.Delimiter_ != "\t") {
  238. return MakeStatus(EStatus::BAD_REQUEST,
  239. TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
  240. }
  241. auto resultStatus = TableClient->RetryOperationSync(
  242. [this, dbPath](NTable::TSession session) {
  243. auto result = session.DescribeTable(dbPath).ExtractValueSync();
  244. if (result.IsSuccess()) {
  245. DbTableInfo = std::make_unique<const NTable::TTableDescription>(result.GetTableDescription());
  246. }
  247. return result;
  248. }, NTable::TRetryOperationSettings{RetrySettings}.MaxRetries(10));
  249. if (!resultStatus.IsSuccess()) {
  250. /// TODO: Remove this after server fix: https://github.com/ydb-platform/ydb/issues/7791
  251. if (resultStatus.GetStatus() == EStatus::SCHEME_ERROR) {
  252. auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath);
  253. if (describePathResult.GetStatus() != EStatus::SUCCESS) {
  254. return MakeStatus(EStatus::SCHEME_ERROR,
  255. TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath);
  256. }
  257. }
  258. return resultStatus;
  259. }
  260. UpsertSettings
  261. .OperationTimeout(settings.OperationTimeout_)
  262. .ClientTimeout(settings.ClientTimeout_);
  263. bool isStdoutInteractive = IsStdoutInteractive();
  264. size_t filePathsSize = filePaths.size();
  265. std::mutex progressWriteLock;
  266. std::atomic<ui64> globalProgress{0};
  267. TProgressBar progressBar(100);
  268. auto writeProgress = [&]() {
  269. ui64 globalProgressValue = globalProgress.load();
  270. std::lock_guard<std::mutex> lock(progressWriteLock);
  271. progressBar.SetProcess(globalProgressValue / filePathsSize);
  272. };
  273. auto start = TInstant::Now();
  274. auto pool = CreateThreadPool(filePathsSize);
  275. TVector<NThreading::TFuture<TStatus>> asyncResults;
  276. // If the single empty filename passed, read from stdin, else from the file
  277. for (const auto& filePath : filePaths) {
  278. auto func = [&, this] {
  279. std::unique_ptr<TFileInput> fileInput;
  280. std::optional<ui64> fileSizeHint;
  281. if (!filePath.empty()) {
  282. const TFsPath dataFile(filePath);
  283. if (!dataFile.Exists()) {
  284. return MakeStatus(EStatus::BAD_REQUEST,
  285. TStringBuilder() << "File does not exist: " << filePath);
  286. }
  287. if (!dataFile.IsFile()) {
  288. return MakeStatus(EStatus::BAD_REQUEST,
  289. TStringBuilder() << "Not a file: " << filePath);
  290. }
  291. TFile file(filePath, OpenExisting | RdOnly | Seq);
  292. i64 fileLength = file.GetLength();
  293. if (fileLength && fileLength >= 0) {
  294. fileSizeHint = fileLength;
  295. }
  296. fileInput = std::make_unique<TFileInput>(file, settings.FileBufferSize_);
  297. }
  298. ProgressCallbackFunc progressCallback;
  299. if (isStdoutInteractive)
  300. {
  301. ui64 oldProgress = 0;
  302. progressCallback = [&, oldProgress](ui64 current, ui64 total) mutable {
  303. ui64 progress = static_cast<ui64>((static_cast<double>(current) / total) * 100.0);
  304. ui64 progressDiff = progress - oldProgress;
  305. globalProgress.fetch_add(progressDiff);
  306. oldProgress = progress;
  307. writeProgress();
  308. };
  309. }
  310. IInputStream& input = fileInput ? *fileInput : Cin;
  311. switch (settings.Format_) {
  312. case EOutputFormat::Default:
  313. case EOutputFormat::Csv:
  314. case EOutputFormat::Tsv:
  315. if (settings.NewlineDelimited_) {
  316. return UpsertCsvByBlocks(filePath, dbPath, settings);
  317. } else {
  318. return UpsertCsv(input, dbPath, settings, fileSizeHint, progressCallback);
  319. }
  320. case EOutputFormat::Json:
  321. case EOutputFormat::JsonUnicode:
  322. case EOutputFormat::JsonBase64:
  323. return UpsertJson(input, dbPath, settings, fileSizeHint, progressCallback);
  324. case EOutputFormat::Parquet:
  325. return UpsertParquet(filePath, dbPath, settings, progressCallback);
  326. default: ;
  327. }
  328. return MakeStatus(EStatus::BAD_REQUEST,
  329. TStringBuilder() << "Unsupported format #" << (int) settings.Format_);
  330. };
  331. asyncResults.push_back(NThreading::Async(std::move(func), *pool));
  332. }
  333. NThreading::WaitAll(asyncResults).GetValueSync();
  334. for (const auto& asyncResult : asyncResults) {
  335. auto result = asyncResult.GetValueSync();
  336. if (!result.IsSuccess()) {
  337. return result;
  338. }
  339. }
  340. auto finish = TInstant::Now();
  341. auto duration = finish - start;
  342. progressBar.SetProcess(100);
  343. Cerr << "Elapsed: " << duration.SecondsFloat() << " sec\n";
  344. return MakeStatus(EStatus::SUCCESS);
  345. }
  346. inline
  347. TAsyncStatus TImportFileClient::UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder) {
  348. auto upsert = [this, dbPath, rows = builder.Build()]
  349. (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus {
  350. NYdb::TValue rowsCopy(rows.GetType(), rows.GetProto());
  351. return tableClient.BulkUpsert(dbPath, std::move(rowsCopy), UpsertSettings)
  352. .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
  353. NYdb::TStatus status = bulkUpsertResult.GetValueSync();
  354. return NThreading::MakeFuture(status);
  355. });
  356. };
  357. return TableClient->RetryOperation(upsert, RetrySettings);
  358. }
  359. TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
  360. std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
  361. TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
  362. TCountingInput countInput(&input);
  363. NCsvFormat::TLinesSplitter splitter(countInput);
  364. auto columnTypes = GetColumnTypes();
  365. ValidateTValueUpsertTable();
  366. TCsvParser parser;
  367. bool removeLastDelimiter = false;
  368. InitCsvParser(parser, removeLastDelimiter, splitter.ConsumeLine(), settings, &columnTypes, DbTableInfo.get());
  369. for (ui32 i = 0; i < settings.SkipRows_; ++i) {
  370. splitter.ConsumeLine();
  371. }
  372. TType lineType = parser.GetColumnsType();
  373. THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
  374. ui32 row = settings.SkipRows_;
  375. ui64 nextBorder = VerboseModeReadSize;
  376. ui64 batchBytes = 0;
  377. ui64 readBytes = 0;
  378. TString line;
  379. std::vector<TAsyncStatus> inFlightRequests;
  380. std::vector<TString> buffer;
  381. auto upsertCsv = [&](std::vector<TString>&& buffer) {
  382. TValueBuilder builder;
  383. builder.BeginList();
  384. for (auto&& line : buffer) {
  385. builder.AddListItem();
  386. parser.GetValue(std::move(line), builder, lineType);
  387. }
  388. builder.EndList();
  389. return UpsertTValueBuffer(dbPath, builder).ExtractValueSync();
  390. };
  391. while (TString line = splitter.ConsumeLine()) {
  392. ++row;
  393. if (line.empty()) {
  394. continue;
  395. }
  396. readBytes += line.Size();
  397. batchBytes += line.Size();
  398. if (removeLastDelimiter) {
  399. if (!line.EndsWith(settings.Delimiter_)) {
  400. return MakeStatus(EStatus::BAD_REQUEST,
  401. "According to the header, lines should end with a delimiter");
  402. }
  403. line.erase(line.Size() - settings.Delimiter_.Size());
  404. }
  405. buffer.push_back(line);
  406. if (readBytes >= nextBorder && RetrySettings.Verbose_) {
  407. nextBorder += VerboseModeReadSize;
  408. Cerr << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << row << " records" << Endl;
  409. }
  410. if (batchBytes < settings.BytesPerRequest_) {
  411. continue;
  412. }
  413. if (inputSizeHint && progressCallback) {
  414. progressCallback(readBytes, *inputSizeHint);
  415. }
  416. auto asyncUpsertCSV = [&, buffer = std::move(buffer)]() mutable {
  417. return upsertCsv(std::move(buffer));
  418. };
  419. batchBytes = 0;
  420. buffer.clear();
  421. inFlightRequests.push_back(NThreading::Async(asyncUpsertCSV, *pool));
  422. auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
  423. if (!status.IsSuccess()) {
  424. return status;
  425. }
  426. }
  427. if (!buffer.empty() && countInput.Counter() > 0) {
  428. upsertCsv(std::move(buffer));
  429. }
  430. return WaitForQueue(0, inFlightRequests);
  431. }
  432. TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
  433. TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
  434. TString headerRow;
  435. TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter);
  436. auto columnTypes = GetColumnTypes();
  437. ValidateTValueUpsertTable();
  438. TCsvParser parser;
  439. bool removeLastDelimiter = false;
  440. InitCsvParser(parser, removeLastDelimiter, std::move(headerRow), settings, &columnTypes, DbTableInfo.get());
  441. TType lineType = parser.GetColumnsType();
  442. TVector<TAsyncStatus> threadResults(splitter.GetSplitCount());
  443. THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount());
  444. for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) {
  445. auto loadCsv = [&, threadId] () {
  446. auto upsertCsv = [&](std::vector<TString>&& buffer) {
  447. TValueBuilder builder;
  448. builder.BeginList();
  449. for (auto&& line : buffer) {
  450. builder.AddListItem();
  451. parser.GetValue(std::move(line), builder, lineType);
  452. }
  453. builder.EndList();
  454. return UpsertTValueBuffer(dbPath, builder);
  455. };
  456. std::vector<TAsyncStatus> inFlightRequests;
  457. std::vector<TString> buffer;
  458. ui32 idx = settings.SkipRows_;
  459. ui64 readBytes = 0;
  460. ui64 batchBytes = 0;
  461. ui64 nextBorder = VerboseModeReadSize;
  462. TAsyncStatus status;
  463. TString line;
  464. while (splitter.GetChunk(threadId).ConsumeLine(line)) {
  465. if (line.empty()) {
  466. continue;
  467. }
  468. readBytes += line.size();
  469. batchBytes += line.size();
  470. if (removeLastDelimiter) {
  471. if (!line.EndsWith(settings.Delimiter_)) {
  472. return MakeStatus(EStatus::BAD_REQUEST,
  473. "According to the header, lines should end with a delimiter");
  474. }
  475. line.erase(line.Size() - settings.Delimiter_.Size());
  476. }
  477. buffer.push_back(line);
  478. ++idx;
  479. if (readBytes >= nextBorder && RetrySettings.Verbose_) {
  480. nextBorder += VerboseModeReadSize;
  481. TStringBuilder builder;
  482. builder << "Processed " << 1.0 * readBytes / (1 << 20) << "Mb and " << idx << " records" << Endl;
  483. Cerr << builder;
  484. }
  485. if (batchBytes >= settings.BytesPerRequest_) {
  486. batchBytes = 0;
  487. auto status = WaitForQueue(splitter.GetThreadLimit(threadId), inFlightRequests);
  488. if (!status.IsSuccess()) {
  489. return status;
  490. }
  491. inFlightRequests.push_back(upsertCsv(std::move(buffer)));
  492. buffer.clear();
  493. }
  494. }
  495. if (!buffer.empty() && splitter.GetChunk(threadId).GetReadCount() != 0) {
  496. inFlightRequests.push_back(upsertCsv(std::move(buffer)));
  497. }
  498. return WaitForQueue(0, inFlightRequests);
  499. };
  500. threadResults[threadId] = NThreading::Async(loadCsv, *pool);
  501. }
  502. NThreading::WaitAll(threadResults).Wait();
  503. for (size_t i = 0; i < splitter.GetSplitCount(); ++i) {
  504. if (!threadResults[i].GetValueSync().IsSuccess()) {
  505. return threadResults[i].GetValueSync();
  506. }
  507. }
  508. return MakeStatus();
  509. }
  510. TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
  511. std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
  512. const TType tableType = GetTableType();
  513. ValidateTValueUpsertTable();
  514. const NYdb::EBinaryStringEncoding stringEncoding =
  515. (settings.Format_ == EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
  516. NYdb::EBinaryStringEncoding::Unicode;
  517. TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
  518. THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
  519. ui64 readBytes = 0;
  520. ui64 batchBytes = 0;
  521. TString line;
  522. std::vector<TString> batchLines;
  523. std::vector<TAsyncStatus> inFlightRequests;
  524. auto upsertJson = [&](const std::vector<TString>& batchLines) {
  525. TValueBuilder batch;
  526. batch.BeginList();
  527. for (auto &line : batchLines) {
  528. batch.AddListItem(JsonToYdbValue(line, tableType, stringEncoding));
  529. }
  530. batch.EndList();
  531. auto value = UpsertTValueBuffer(dbPath, batch);
  532. return value.ExtractValueSync();
  533. };
  534. while (size_t size = input.ReadLine(line)) {
  535. batchLines.push_back(line);
  536. batchBytes += size;
  537. readBytes += size;
  538. if (inputSizeHint && progressCallback) {
  539. progressCallback(readBytes, *inputSizeHint);
  540. }
  541. if (batchBytes < settings.BytesPerRequest_) {
  542. continue;
  543. }
  544. batchBytes = 0;
  545. auto asyncUpsertJson = [&, batchLines = std::move(batchLines)]() {
  546. return upsertJson(batchLines);
  547. };
  548. batchLines.clear();
  549. inFlightRequests.push_back(NThreading::Async(asyncUpsertJson, *pool));
  550. auto status = WaitForQueue(inFlightGetter.GetCurrentMaxInflight(), inFlightRequests);
  551. if (!status.IsSuccess()) {
  552. return status;
  553. }
  554. }
  555. if (!batchLines.empty()) {
  556. upsertJson(std::move(batchLines));
  557. }
  558. return WaitForQueue(0, inFlightRequests);
  559. }
  560. TStatus TImportFileClient::UpsertParquet([[maybe_unused]] const TString& filename,
  561. [[maybe_unused]] const TString& dbPath,
  562. [[maybe_unused]] const TImportFileSettings& settings,
  563. [[maybe_unused]] ProgressCallbackFunc & progressCallback) {
  564. #if defined(_WIN64) || defined(_WIN32) || defined(__WIN32__)
  565. return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
  566. #else
  567. std::shared_ptr<arrow::io::ReadableFile> infile;
  568. arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename);
  569. if (!fileResult.ok()) {
  570. return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Unable to open parquet file:" << fileResult.status().ToString());
  571. }
  572. std::shared_ptr<arrow::io::ReadableFile> readableFile = *fileResult;
  573. std::unique_ptr<parquet::arrow::FileReader> fileReader;
  574. arrow::Status st;
  575. st = parquet::arrow::OpenFile(readableFile, arrow::default_memory_pool(), &fileReader);
  576. if (!st.ok()) {
  577. return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while initializing arrow FileReader: " << st.ToString());
  578. }
  579. auto metadata = parquet::ReadMetaData(readableFile);
  580. const i64 numRows = metadata->num_rows();
  581. const i64 numRowGroups = metadata->num_row_groups();
  582. std::vector<int> row_group_indices(numRowGroups);
  583. for (i64 i = 0; i < numRowGroups; i++) {
  584. row_group_indices[i] = i;
  585. }
  586. std::unique_ptr<arrow::RecordBatchReader> reader;
  587. st = fileReader->GetRecordBatchReader(row_group_indices, &reader);
  588. if (!st.ok()) {
  589. return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while getting RecordBatchReader: " << st.ToString());
  590. }
  591. THolder<IThreadPool> pool = CreateThreadPool(settings.Threads_);
  592. std::atomic<ui64> uploadedRows = 0;
  593. auto uploadedRowsCallback = [&](ui64 rows) {
  594. ui64 uploadedRowsValue = uploadedRows.fetch_add(rows);
  595. if (progressCallback) {
  596. progressCallback(uploadedRowsValue + rows, numRows);
  597. }
  598. };
  599. std::vector<TAsyncStatus> inFlightRequests;
  600. while (true) {
  601. std::shared_ptr<arrow::RecordBatch> batch;
  602. st = reader->ReadNext(&batch);
  603. if (!st.ok()) {
  604. return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Error while reading next RecordBatch" << st.ToString());
  605. }
  606. // The read function will return null at the end of data stream.
  607. if (!batch) {
  608. break;
  609. }
  610. auto upsertParquetBatch = [&, batch = std::move(batch)]() {
  611. const TString strSchema = NYdb_cli::NArrow::SerializeSchema(*batch->schema());
  612. const size_t totalSize = NYdb_cli::NArrow::GetBatchDataSize(batch);
  613. const size_t sliceCount =
  614. (totalSize / (size_t)settings.BytesPerRequest_) + (totalSize % settings.BytesPerRequest_ != 0 ? 1 : 0);
  615. const i64 rowsInSlice = batch->num_rows() / sliceCount;
  616. for (i64 currentRow = 0; currentRow < batch->num_rows(); currentRow += rowsInSlice) {
  617. std::stack<std::shared_ptr<arrow::RecordBatch>> rowsToSendBatches;
  618. if (currentRow + rowsInSlice < batch->num_rows()) {
  619. rowsToSendBatches.push(batch->Slice(currentRow, rowsInSlice));
  620. } else {
  621. rowsToSendBatches.push(batch->Slice(currentRow));
  622. }
  623. do {
  624. auto rowsBatch = std::move(rowsToSendBatches.top());
  625. rowsToSendBatches.pop();
  626. // Nothing to send. Continue.
  627. if (rowsBatch->num_rows() == 0) {
  628. continue;
  629. }
  630. // Logarithmic approach to find number of rows fit into the byte limit.
  631. if (rowsBatch->num_rows() == 1 || NYdb_cli::NArrow::GetBatchDataSize(rowsBatch) < settings.BytesPerRequest_) {
  632. // Single row or fits into the byte limit.
  633. auto value = UpsertParquetBuffer(dbPath, NYdb_cli::NArrow::SerializeBatchNoCompression(rowsBatch), strSchema);
  634. auto status = value.ExtractValueSync();
  635. if (!status.IsSuccess())
  636. return status;
  637. uploadedRowsCallback(rowsBatch->num_rows());
  638. } else {
  639. // Split current slice.
  640. i64 halfLen = rowsBatch->num_rows() / 2;
  641. rowsToSendBatches.push(rowsBatch->Slice(halfLen));
  642. rowsToSendBatches.push(rowsBatch->Slice(0, halfLen));
  643. }
  644. } while (!rowsToSendBatches.empty());
  645. };
  646. return MakeStatus();
  647. };
  648. inFlightRequests.push_back(NThreading::Async(upsertParquetBatch, *pool));
  649. auto status = WaitForQueue(settings.MaxInFlightRequests_, inFlightRequests);
  650. if (!status.IsSuccess()) {
  651. return status;
  652. }
  653. }
  654. return WaitForQueue(0, inFlightRequests);
  655. #endif
  656. }
  657. inline
  658. TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) {
  659. auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
  660. return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings)
  661. .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) {
  662. NYdb::TStatus status = bulkUpsertResult.GetValueSync();
  663. return NThreading::MakeFuture(status);
  664. });
  665. };
  666. return TableClient->RetryOperation(upsert, RetrySettings);
  667. }
  668. TType TImportFileClient::GetTableType() {
  669. TTypeBuilder typeBuilder;
  670. typeBuilder.BeginStruct();
  671. Y_ENSURE_BT(DbTableInfo);
  672. const auto& columns = DbTableInfo->GetTableColumns();
  673. for (auto it = columns.begin(); it != columns.end(); it++) {
  674. typeBuilder.AddMember((*it).Name, (*it).Type);
  675. }
  676. typeBuilder.EndStruct();
  677. return typeBuilder.Build();
  678. }
  679. std::map<TString, TType> TImportFileClient::GetColumnTypes() {
  680. std::map<TString, TType> columnTypes;
  681. Y_ENSURE_BT(DbTableInfo);
  682. const auto& columns = DbTableInfo->GetTableColumns();
  683. for (auto it = columns.begin(); it != columns.end(); it++) {
  684. columnTypes.insert({(*it).Name, (*it).Type});
  685. }
  686. return columnTypes;
  687. }
  688. void TImportFileClient::ValidateTValueUpsertTable() {
  689. auto columnTypes = GetColumnTypes();
  690. bool hasPgType = false;
  691. for (const auto& [_, type] : columnTypes) {
  692. if (TTypeParser(type).GetKind() == TTypeParser::ETypeKind::Pg) {
  693. hasPgType = true;
  694. break;
  695. }
  696. }
  697. Y_ENSURE_BT(DbTableInfo);
  698. if (DbTableInfo->GetStoreType() == NTable::EStoreType::Column && hasPgType) {
  699. throw TMisuseException() << "Import into column table with Pg type columns in not supported";
  700. }
  701. }
  702. }
  703. }