columnshard_ut_common.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. #include "columnshard_ut_common.h"
  2. #include "columnshard__stats_scan.h"
  3. #include "common/tests/shard_reader.h"
  4. #include <ydb/core/base/tablet.h>
  5. #include <ydb/core/base/tablet_resolver.h>
  6. #include <ydb/core/scheme/scheme_types_proto.h>
  7. #include <library/cpp/testing/unittest/registar.h>
  8. namespace NKikimr::NTxUT {
  9. using namespace NColumnShard;
  10. using namespace Tests;
  11. void TTester::Setup(TTestActorRuntime& runtime) {
  12. runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
  13. runtime.SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_INFO);
  14. runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
  15. runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG);
  16. ui32 domainId = 0;
  17. ui32 planResolution = 500;
  18. TAppPrepare app;
  19. auto domain = TDomainsInfo::TDomain::ConstructDomainWithExplicitTabletIds(
  20. "dc-1", domainId, FAKE_SCHEMESHARD_TABLET_ID,
  21. domainId, domainId, std::vector<ui32>{domainId},
  22. domainId, std::vector<ui32>{domainId},
  23. planResolution,
  24. std::vector<ui64>{TDomainsInfo::MakeTxCoordinatorIDFixed(domainId, 1)},
  25. std::vector<ui64>{},
  26. std::vector<ui64>{TDomainsInfo::MakeTxAllocatorIDFixed(domainId, 1)});
  27. TVector<ui64> ids = runtime.GetTxAllocatorTabletIds();
  28. ids.insert(ids.end(), domain->TxAllocators.begin(), domain->TxAllocators.end());
  29. runtime.SetTxAllocatorTabletIds(ids);
  30. app.AddDomain(domain.Release());
  31. SetupTabletServices(runtime, &app);
  32. runtime.UpdateCurrentTime(TInstant::Now());
  33. }
  34. void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot) {
  35. auto event = std::make_unique<NMetadata::NProvider::TEvRefreshSubscriberData>(snapshot);
  36. ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
  37. }
  38. bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) {
  39. auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
  40. NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody);
  41. ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
  42. auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender);
  43. const auto& res = ev->Get()->Record;
  44. UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
  45. UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_SCHEMA);
  46. return (res.GetStatus() == NKikimrTxColumnShard::PREPARED);
  47. }
  48. void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) {
  49. auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
  50. auto tx = plan->Record.AddTransactions();
  51. tx->SetTxId(snap.GetTxId());
  52. ActorIdToProto(sender, tx->MutableAckTo());
  53. ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release());
  54. UNIT_ASSERT(runtime.GrabEdgeEvent<TEvTxProcessing::TEvPlanStepAck>(sender));
  55. auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender);
  56. const auto& res = ev->Get()->Record;
  57. UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
  58. UNIT_ASSERT_EQUAL(res.GetTxKind(), NKikimrTxColumnShard::TX_KIND_SCHEMA);
  59. UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
  60. }
  61. void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
  62. auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
  63. auto tx = plan->Record.AddTransactions();
  64. tx->SetTxId(snap.GetTxId());
  65. ActorIdToProto(sender, tx->MutableAckTo());
  66. ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, plan.release());
  67. UNIT_ASSERT(runtime.GrabEdgeEvent<TEvTxProcessing::TEvPlanStepAck>(sender));
  68. if (waitResult) {
  69. auto ev = runtime.GrabEdgeEvent<NEvents::TDataEvents::TEvWriteResult>(sender);
  70. const auto& res = ev->Get()->Record;
  71. UNIT_ASSERT_EQUAL(res.GetTxId(), snap.GetTxId());
  72. UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
  73. }
  74. }
  75. ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>* writeIds) {
  76. TAutoPtr<IEventHandle> handle;
  77. auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
  78. UNIT_ASSERT(event);
  79. auto& resWrite = Proto(event);
  80. UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), shardId);
  81. UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0);
  82. if (writeIds && resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
  83. writeIds->push_back(resWrite.GetWriteId());
  84. }
  85. return resWrite.GetStatus();
  86. }
  87. bool WriteDataImpl(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 tableId,
  88. const NLongTxService::TLongTxId& longTxId, const ui64 writeId,
  89. const TString& data, const std::shared_ptr<arrow::Schema>& schema, std::vector<ui64>* writeIds) {
  90. const TString dedupId = ToString(writeId);
  91. auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, dedupId, data, writeId);
  92. Y_ABORT_UNLESS(schema);
  93. write->SetArrowSchema(NArrow::SerializeSchema(*schema));
  94. ForwardToTablet(runtime, shardId, sender, write.release());
  95. if (writeIds) {
  96. return WaitWriteResult(runtime, shardId, writeIds) == NKikimrTxColumnShard::EResultStatus::SUCCESS;
  97. }
  98. return true;
  99. }
  100. bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
  101. const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, std::vector<ui64>* writeIds) {
  102. NLongTxService::TLongTxId longTxId;
  103. UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
  104. return WriteDataImpl(runtime, sender, shardId, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds);
  105. }
  106. bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 writeId, const ui64 tableId, const TString& data,
  107. const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema, bool waitResult, std::vector<ui64>* writeIds) {
  108. NLongTxService::TLongTxId longTxId;
  109. UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
  110. if (writeIds) {
  111. return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), writeIds);
  112. }
  113. std::vector<ui64> ids;
  114. return WriteDataImpl(runtime, sender, TTestTxConfig::TxTablet0, tableId, longTxId, writeId, data, NArrow::MakeArrowSchema(ydbSchema), waitResult ? &ids : nullptr);
  115. }
  116. std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
  117. ui64 tableId, const ui64 writePartId, const TString& data,
  118. const std::vector<std::pair<TString, TTypeInfo>>& ydbSchema)
  119. {
  120. auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, longTxId, tableId, "0", data, writePartId);
  121. write->SetArrowSchema(NArrow::SerializeSchema(*NArrow::MakeArrowSchema(ydbSchema)));
  122. ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release());
  123. TAutoPtr<IEventHandle> handle;
  124. auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
  125. UNIT_ASSERT(event);
  126. auto& resWrite = Proto(event);
  127. UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0);
  128. UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), 0);
  129. if (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
  130. return resWrite.GetWriteId();
  131. }
  132. return {};
  133. }
  134. void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds,
  135. NOlap::TSnapshot snap, ui64 scanId) {
  136. auto scan = std::make_unique<TEvColumnShard::TEvScan>();
  137. auto& record = scan->Record;
  138. record.SetTxId(snap.GetPlanStep());
  139. record.SetScanId(scanId);
  140. // record.SetLocalPathId(0);
  141. record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE);
  142. // Schema: pathId, kind, rows, bytes, rawBytes. PK: {pathId, kind}
  143. //record.SetSchemaVersion(0);
  144. auto ydbSchema = PrimaryIndexStatsSchema;
  145. for (const auto& col : ydbSchema.Columns) {
  146. record.AddColumnTags(col.second.Id);
  147. auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(col.second.PType, col.second.PTypeMod);
  148. record.AddColumnTypes(columnType.TypeId);
  149. if (columnType.TypeInfo) {
  150. *record.AddColumnTypeInfos() = *columnType.TypeInfo;
  151. } else {
  152. *record.AddColumnTypeInfos() = NKikimrProto::TTypeInfo();
  153. }
  154. }
  155. for (ui64 pathId : pathIds) {
  156. std::vector<TCell> pk{TCell::Make<ui64>(pathId)};
  157. TSerializedTableRange range(TConstArrayRef<TCell>(pk), true, TConstArrayRef<TCell>(pk), true);
  158. auto newRange = record.MutableRanges()->Add();
  159. range.Serialize(*newRange);
  160. }
  161. record.MutableSnapshot()->SetStep(snap.GetPlanStep());
  162. record.MutableSnapshot()->SetTxId(snap.GetTxId());
  163. record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);
  164. ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release());
  165. }
  166. void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds) {
  167. NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT;
  168. TString txBody = TTestSchema::CommitTxBody(0, writeIds);
  169. ForwardToTablet(runtime, shardId, sender,
  170. new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody));
  171. TAutoPtr<IEventHandle> handle;
  172. auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
  173. UNIT_ASSERT(event);
  174. auto& res = Proto(event);
  175. UNIT_ASSERT_EQUAL(res.GetTxKind(), txKind);
  176. UNIT_ASSERT_EQUAL(res.GetTxId(), txId);
  177. UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED);
  178. }
  179. void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) {
  180. ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds);
  181. }
  182. void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) {
  183. PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
  184. }
  185. void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds) {
  186. auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, shardId);
  187. for (ui64 txId : txIds) {
  188. auto tx = plan->Record.AddTransactions();
  189. tx->SetTxId(txId);
  190. ActorIdToProto(sender, tx->MutableAckTo());
  191. }
  192. ForwardToTablet(runtime, shardId, sender, plan.release());
  193. TAutoPtr<IEventHandle> handle;
  194. for (ui32 i = 0; i < txIds.size(); ++i) {
  195. auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
  196. UNIT_ASSERT(event);
  197. auto& res = Proto(event);
  198. UNIT_ASSERT(txIds.contains(res.GetTxId()));
  199. UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
  200. }
  201. }
  202. TCell MakeTestCell(const TTypeInfo& typeInfo, ui32 value, std::vector<TString>& mem) {
  203. auto type = typeInfo.GetTypeId();
  204. if (type == NTypeIds::Utf8 ||
  205. type == NTypeIds::String ||
  206. type == NTypeIds::String4k ||
  207. type == NTypeIds::String2m) {
  208. mem.push_back(ToString(value));
  209. const TString& str = mem.back();
  210. return TCell(str.data(), str.size());
  211. } else if (type == NTypeIds::JsonDocument || type == NTypeIds::Json) {
  212. mem.push_back("{}");
  213. const TString& str = mem.back();
  214. return TCell(str.data(), str.size());
  215. } else if (type == NTypeIds::Yson) {
  216. mem.push_back("{ \"a\" = [ { \"b\" = 1; } ]; }");
  217. const TString& str = mem.back();
  218. return TCell(str.data(), str.size());
  219. } else if (type == NTypeIds::Timestamp || type == NTypeIds::Interval ||
  220. type == NTypeIds::Uint64 || type == NTypeIds::Int64) {
  221. return TCell::Make<ui64>(value);
  222. } else if (type == NTypeIds::Uint32 || type == NTypeIds::Int32 || type == NTypeIds::Datetime) {
  223. return TCell::Make<ui32>(value);
  224. } else if (type == NTypeIds::Uint16 || type == NTypeIds::Int16 || type == NTypeIds::Date) {
  225. return TCell::Make<ui16>(value);
  226. } else if (type == NTypeIds::Uint8 || type == NTypeIds::Int8 || type == NTypeIds::Byte ||
  227. type == NTypeIds::Bool) {
  228. return TCell::Make<ui8>(value);
  229. } else if (type == NTypeIds::Float) {
  230. return TCell::Make<float>(value);
  231. } else if (type == NTypeIds::Double) {
  232. return TCell::Make<double>(value);
  233. }
  234. UNIT_ASSERT(false);
  235. return {};
  236. }
  237. std::vector<TCell> MakeTestCells(const std::vector<TTypeInfo>& types, ui32 value, std::vector<TString>& mem) {
  238. std::vector<TCell> cells;
  239. cells.reserve(types.size());
  240. for (const auto& typeInfo : types) {
  241. cells.push_back(MakeTestCell(typeInfo, value, mem));
  242. }
  243. return cells;
  244. }
  245. TString MakeTestBlob(std::pair<ui64, ui64> range, const std::vector<std::pair<TString, TTypeInfo>>& columns,
  246. const TTestBlobOptions& options, const std::set<std::string>& notNullColumns) {
  247. TString err;
  248. NArrow::TArrowBatchBuilder batchBuilder(arrow::Compression::LZ4_FRAME, notNullColumns);
  249. batchBuilder.Start(columns, 0, 0, err);
  250. std::vector<ui32> nullPositions;
  251. std::vector<ui32> samePositions;
  252. for (size_t i = 0; i < columns.size(); ++i) {
  253. if (options.NullColumns.contains(columns[i].first)) {
  254. nullPositions.push_back(i);
  255. } else if (options.SameValueColumns.contains(columns[i].first)) {
  256. samePositions.push_back(i);
  257. }
  258. }
  259. std::vector<TString> mem;
  260. std::vector<TTypeInfo> types = TTestSchema::ExtractTypes(columns);
  261. // insert, not ordered
  262. for (size_t i = range.first; i < range.second; i += 2) {
  263. std::vector<TCell> cells = MakeTestCells(types, i, mem);
  264. for (auto& pos : nullPositions) {
  265. cells[pos] = TCell();
  266. }
  267. for (auto& pos : samePositions) {
  268. cells[pos] = MakeTestCell(types[pos], options.SameValue, mem);
  269. }
  270. NKikimr::TDbTupleRef unused;
  271. batchBuilder.AddRow(unused, NKikimr::TDbTupleRef(types.data(), cells.data(), types.size()));
  272. }
  273. for (size_t i = range.first + 1; i < range.second; i += 2) {
  274. std::vector<TCell> cells = MakeTestCells(types, i, mem);
  275. for (auto& pos : nullPositions) {
  276. cells[pos] = TCell();
  277. }
  278. for (auto& pos : samePositions) {
  279. cells[pos] = MakeTestCell(types[pos], options.SameValue, mem);
  280. }
  281. NKikimr::TDbTupleRef unused;
  282. batchBuilder.AddRow(unused, NKikimr::TDbTupleRef(types.data(), cells.data(), types.size()));
  283. }
  284. auto batch = batchBuilder.FlushBatch(true);
  285. UNIT_ASSERT(batch);
  286. auto status = batch->ValidateFull();
  287. UNIT_ASSERT(status.ok());
  288. TString blob = batchBuilder.Finish();
  289. UNIT_ASSERT(!blob.empty());
  290. return blob;
  291. }
  292. TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveFrom, bool inclusiveTo,
  293. const std::vector<std::pair<TString, TTypeInfo>>& columns) {
  294. std::vector<TString> mem;
  295. std::vector<TTypeInfo> types = TTestSchema::ExtractTypes(columns);
  296. std::vector<TCell> cellsFrom = MakeTestCells(types, range.first, mem);
  297. std::vector<TCell> cellsTo = MakeTestCells(types, range.second, mem);
  298. return TSerializedTableRange(TConstArrayRef<TCell>(cellsFrom), inclusiveFrom,
  299. TConstArrayRef<TCell>(cellsTo), inclusiveTo);
  300. }
  301. NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpecials& specials) {
  302. std::unique_ptr<NColumnShard::NTiers::TConfigsSnapshot> cs(new NColumnShard::NTiers::TConfigsSnapshot(Now()));
  303. if (specials.Tiers.empty()) {
  304. return cs;
  305. }
  306. NColumnShard::NTiers::TTieringRule tRule;
  307. tRule.SetTieringRuleId("Tiering1");
  308. for (auto&& tier : specials.Tiers) {
  309. if (!tRule.GetDefaultColumn()) {
  310. tRule.SetDefaultColumn(tier.TtlColumn);
  311. }
  312. UNIT_ASSERT(tRule.GetDefaultColumn() == tier.TtlColumn);
  313. {
  314. NKikimrSchemeOp::TStorageTierConfig cProto;
  315. cProto.SetName(tier.Name);
  316. *cProto.MutableObjectStorage() = tier.S3;
  317. if (tier.Codec) {
  318. cProto.MutableCompression()->SetCompressionCodec(tier.GetCodecId());
  319. }
  320. if (tier.CompressionLevel) {
  321. cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel);
  322. }
  323. NColumnShard::NTiers::TTierConfig tConfig(tier.Name, cProto);
  324. cs->MutableTierConfigs().emplace(tConfig.GetTierName(), tConfig);
  325. }
  326. tRule.AddInterval(tier.Name, TDuration::Seconds((*tier.EvictAfter).Seconds()));
  327. }
  328. cs->MutableTableTierings().emplace(tRule.GetTieringRuleId(), tRule);
  329. return cs;
  330. }
  331. }
  332. namespace NKikimr::NColumnShard {
  333. NOlap::TIndexInfo BuildTableInfo(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema,
  334. const std::vector<std::pair<TString, NScheme::TTypeInfo>>& key) {
  335. NOlap::TIndexInfo indexInfo = NOlap::TIndexInfo::BuildDefault();
  336. for (ui32 i = 0; i < ydbSchema.size(); ++i) {
  337. ui32 id = i + 1;
  338. auto& name = ydbSchema[i].first;
  339. auto& type = ydbSchema[i].second;
  340. indexInfo.Columns[id] = NTable::TColumn(name, id, type, "");
  341. indexInfo.ColumnNames[name] = id;
  342. }
  343. for (const auto& [keyName, keyType] : key) {
  344. indexInfo.KeyColumns.push_back(indexInfo.ColumnNames[keyName]);
  345. }
  346. indexInfo.SetAllKeys();
  347. return indexInfo;
  348. }
  349. void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
  350. const TestTableDescription& table, TString codec) {
  351. using namespace NTxUT;
  352. NOlap::TSnapshot snap(10, 10);
  353. TString txBody;
  354. auto specials = TTestSchema::TTableSpecials().WithCodec(codec);
  355. if (table.InStore) {
  356. txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials);
  357. } else {
  358. txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials);
  359. }
  360. bool ok = ProposeSchemaTx(runtime, sender, txBody, snap);
  361. UNIT_ASSERT(ok);
  362. PlanSchemaTx(runtime, sender, snap);
  363. }
  364. void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema, const ui32 keySize) {
  365. using namespace NTxUT;
  366. CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
  367. TDispatchOptions options;
  368. options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
  369. runtime.DispatchEvents(options);
  370. TestTableDescription tableDescription;
  371. tableDescription.Schema = schema;
  372. tableDescription.Pk = {};
  373. for (ui64 i = 0; i < keySize; ++i) {
  374. Y_ABORT_UNLESS(i < schema.size());
  375. tableDescription.Pk.push_back(schema[i]);
  376. }
  377. TActorId sender = runtime.AllocateEdgeActor();
  378. SetupSchema(runtime, sender, tableId, tableDescription);
  379. }
  380. std::shared_ptr<arrow::RecordBatch> ReadAllAsBatch(TTestBasicRuntime& runtime, const ui64 tableId, const NOlap::TSnapshot& snapshot, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& schema) {
  381. std::vector<TString> fields;
  382. for (auto&& f : schema) {
  383. fields.emplace_back(f.first);
  384. }
  385. NOlap::NTests::TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, snapshot);
  386. reader.SetReplyColumns(fields);
  387. auto rb = reader.ReadAll();
  388. UNIT_ASSERT(reader.IsCorrectlyFinished());
  389. return rb ? rb : NArrow::MakeEmptyBatch(NArrow::MakeArrowSchema(schema));
  390. }
  391. }