datashard_ut_stats.cpp 17 KB


  1. #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
  2. #include "datashard_ut_common_kqp.h"
  3. #include "ydb/core/tablet_flat/shared_sausagecache.h"
  4. namespace NKikimr {
  5. using namespace NKikimr::NDataShard;
  6. using namespace NSchemeShard;
  7. using namespace Tests;
  8. Y_UNIT_TEST_SUITE(DataShardStats) {
  9. NKikimrTableStats::TTableStats GetTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 tableId) {
  10. auto sender = runtime.AllocateEdgeActor();
  11. auto request = MakeHolder<TEvDataShard::TEvGetTableStats>(tableId);
  12. runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries());
  13. auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetTableStatsResult>(sender);
  14. return ev->Get()->Record.GetTableStats();
  15. }
  16. TVector<std::pair<ui64, ui64>> ReadHistogram(NKikimrTableStats::THistogram histogram) {
  17. TVector<std::pair<ui64, ui64>> result;
  18. for (auto b : histogram.GetBuckets()) {
  19. TSerializedCellVec key(b.GetKey());
  20. auto keyValue = key.GetCells()[0].AsValue<ui32>();
  21. result.push_back({keyValue, b.GetValue()});
  22. }
  23. return result;
  24. }
  25. Y_UNIT_TEST(OneChannelStatsCorrect) {
  26. TPortManager pm;
  27. TServerSettings serverSettings(pm.GetPort(2134));
  28. serverSettings.SetDomainName("Root")
  29. .SetUseRealThreads(false);
  30. TServer::TPtr server = new TServer(serverSettings);
  31. auto& runtime = *server->GetRuntime();
  32. auto sender = runtime.AllocateEdgeActor();
  33. runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
  34. runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE);
  35. InitRoot(server, sender);
  36. auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
  37. ui64 shard1 = shards.at(0);
  38. ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");
  39. {
  40. Cerr << "... waiting for stats after upsert" << Endl;
  41. auto stats = WaitTableStats(runtime, shard1);
  42. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  43. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
  44. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
  45. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 704u);
  46. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
  47. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetImmediateTxCompleted(), 1u);
  48. }
  49. CompactTable(runtime, shard1, tableId1, false);
  50. {
  51. Cerr << "... waiting for stats after compaction" << Endl;
  52. auto stats = WaitTableStats(runtime, shard1, 1);
  53. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  54. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
  55. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
  56. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 65u);
  57. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 54u);
  58. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels().size(), 1);
  59. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetChannel(), 1u);
  60. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetDataSize(), 65u);
  61. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 54u);
  62. }
  63. Upsert(runtime, sender, shard1, tableId1, TShardedTableOptions().Columns_, 1, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
  64. {
  65. Cerr << "... waiting for stats after write" << Endl;
  66. auto stats = WaitTableStats(runtime, shard1);
  67. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  68. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 4u);
  69. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetImmediateTxCompleted(), 2u);
  70. }
  71. }
  72. Y_UNIT_TEST(MultipleChannelsStatsCorrect) {
  73. TPortManager pm;
  74. TServerSettings serverSettings(pm.GetPort(2134));
  75. serverSettings.SetDomainName("Root")
  76. .SetUseRealThreads(false)
  77. .AddStoragePool("ssd")
  78. .AddStoragePool("hdd");
  79. TServer::TPtr server = new TServer(serverSettings);
  80. auto& runtime = *server->GetRuntime();
  81. auto sender = runtime.AllocateEdgeActor();
  82. runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
  83. InitRoot(server, sender);
  84. auto opts = TShardedTableOptions()
  85. .Shards(1)
  86. .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false, "hdd"}})
  87. .Families({{.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd"}, {.Name = "hdd", .DataPoolKind = "hdd"}});
  88. CreateShardedTable(server, sender, "/Root", "table-1", opts);
  89. const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
  90. const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
  91. ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3)");
  92. {
  93. Cerr << "... waiting for stats after upsert" << Endl;
  94. auto stats = WaitTableStats(runtime, shard1);
  95. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  96. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
  97. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
  98. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 752u);
  99. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
  100. }
  101. CompactTable(runtime, shard1, tableId1, false);
  102. {
  103. Cerr << "... waiting for stats after compaction" << Endl;
  104. auto stats = WaitTableStats(runtime, shard1, 1);
  105. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  106. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
  107. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
  108. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 115u);
  109. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 82u);
  110. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels().size(), 2);
  111. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetChannel(), 1u); // ssd
  112. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetDataSize(), 65u);
  113. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 82u);
  114. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[1].GetChannel(), 2u); // hdd
  115. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[1].GetDataSize(), 50u);
  116. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[1].GetIndexSize(), 0u);
  117. }
  118. }
  119. Y_UNIT_TEST(HistogramStatsCorrect) {
  120. const auto gDbStatsDataSizeResolutionBefore = NDataShard::gDbStatsDataSizeResolution;
  121. const auto gDbStatsRowCountResolutionBefore = NDataShard::gDbStatsRowCountResolution;
  122. NDataShard::gDbStatsDataSizeResolution = 1; // by page stats
  123. NDataShard::gDbStatsRowCountResolution = 1;
  124. TPortManager pm;
  125. TServerSettings serverSettings(pm.GetPort(2134));
  126. serverSettings.SetDomainName("Root")
  127. .SetUseRealThreads(false);
  128. TServer::TPtr server = new TServer(serverSettings);
  129. auto& runtime = *server->GetRuntime();
  130. auto sender = runtime.AllocateEdgeActor();
  131. runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
  132. InitRoot(server, sender);
  133. CreateShardedTable(server, sender, "/Root", "table-1", 1);
  134. const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
  135. const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
  136. const int count = 2000;
  137. TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES ";
  138. for (auto times = 0; times < count; times++) {
  139. if (times != 0)
  140. query += ", ";
  141. query += "(" + ToString(times) + ", " + ToString(times) + ") ";
  142. }
  143. ExecSQL(server, sender, query);
  144. {
  145. Cerr << "... waiting for stats after upsert" << Endl;
  146. auto stats = WaitTableStats(runtime, shard1);
  147. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  148. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count);
  149. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
  150. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 196096u);
  151. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
  152. }
  153. CompactTable(runtime, shard1, tableId1, false);
  154. {
  155. Cerr << "... waiting for stats after compaction" << Endl;
  156. auto stats = WaitTableStats(runtime, shard1, 1);
  157. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  158. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count);
  159. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
  160. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 30100u);
  161. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 138u);
  162. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetChannel(), 1u);
  163. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetDataSize(), 30100u);
  164. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 138u);
  165. }
  166. {
  167. auto stats = GetTableStats(runtime, shard1, tableId1.PathId.LocalPathId);
  168. auto dataSizeHistogram = ReadHistogram(stats.GetDataSizeHistogram());
  169. TVector<std::pair<ui64, ui64>> expectedDataSizeHistogram = {{475, 7145}, {950, 14290}, {1425, 21435}, {1900, 28580}};
  170. UNIT_ASSERT_VALUES_EQUAL(expectedDataSizeHistogram, dataSizeHistogram);
  171. auto rowCountHistogram = ReadHistogram(stats.GetRowCountHistogram());
  172. TVector<std::pair<ui64, ui64>> expectedRowCountHistogram = {{475, 475}, {950, 950}, {1425, 1425}, {1900, 1900}};
  173. UNIT_ASSERT_VALUES_EQUAL(expectedRowCountHistogram, rowCountHistogram);
  174. }
  175. NDataShard::gDbStatsDataSizeResolution = gDbStatsDataSizeResolutionBefore;
  176. NDataShard::gDbStatsRowCountResolution = gDbStatsRowCountResolutionBefore;
  177. }
  178. Y_UNIT_TEST(BlobsStatsCorrect) {
  179. TPortManager pm;
  180. TServerSettings serverSettings(pm.GetPort(2134));
  181. serverSettings.SetDomainName("Root")
  182. .SetUseRealThreads(false)
  183. .AddStoragePool("ssd")
  184. .AddStoragePool("hdd")
  185. .AddStoragePool("ext");
  186. TServer::TPtr server = new TServer(serverSettings);
  187. auto& runtime = *server->GetRuntime();
  188. auto sender = runtime.AllocateEdgeActor();
  189. runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
  190. InitRoot(server, sender);
  191. auto opts = TShardedTableOptions()
  192. .Shards(1)
  193. .Columns({
  194. {"key", "Uint32", true, false},
  195. {"value", "String", false, false},
  196. {"value2", "String", false, false, "hdd"}})
  197. .Families({
  198. {.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd",
  199. .ExternalPoolKind = "ext", .DataThreshold = 100u, .ExternalThreshold = 200u},
  200. {.Name = "hdd", .DataPoolKind = "hdd"}});
  201. CreateShardedTable(server, sender, "/Root", "table-1", opts);
  202. const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
  203. const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
  204. TString smallValue(150, 'S');
  205. TString largeValue(1500, 'L');
  206. ExecSQL(server, sender, (TString)"UPSERT INTO `/Root/table-1` (key, value, value2) VALUES " +
  207. "(1, \"AAA\", \"AAA\"), " +
  208. "(2, \"" + smallValue + "\", \"BBB\"), " +
  209. "(3, \"CCC\", \"" + smallValue + "\"), " +
  210. "(4, \"" + largeValue + "\", \"BBB\"), " +
  211. "(5, \"CCC\", \"" + largeValue + "\")");
  212. {
  213. Cerr << "... waiting for stats after upsert" << Endl;
  214. auto stats = WaitTableStats(runtime, shard1);
  215. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  216. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u);
  217. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
  218. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 4232u);
  219. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
  220. }
  221. CompactTable(runtime, shard1, tableId1, false);
  222. {
  223. Cerr << "... waiting for stats after compaction" << Endl;
  224. auto stats = WaitTableStats(runtime, shard1, 1);
  225. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  226. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u);
  227. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
  228. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 3555u);
  229. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 82u);
  230. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels().size(), 3);
  231. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetChannel(), 1u); // ssd
  232. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetDataSize(), 440u); // two small values
  233. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 82u);
  234. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[2].GetChannel(), 3u); // hdd
  235. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[2].GetDataSize(), 99u);
  236. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[2].GetIndexSize(), 0u);
  237. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[1].GetChannel(), 2u); // ext
  238. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[1].GetDataSize(), 3016u); // two large values
  239. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[1].GetIndexSize(), 0u);
  240. }
  241. }
  242. Y_UNIT_TEST(SharedCacheGarbage) {
  243. TPortManager pm;
  244. TServerSettings serverSettings(pm.GetPort(2134));
  245. serverSettings.SetDomainName("Root")
  246. .SetUseRealThreads(false);
  247. TServer::TPtr server = new TServer(serverSettings);
  248. auto& runtime = *server->GetRuntime();
  249. auto sender = runtime.AllocateEdgeActor();
  250. runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
  251. runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE);
  252. InitRoot(server, sender);
  253. auto opts = TShardedTableOptions()
  254. .Shards(1)
  255. .Columns({
  256. {"key", "Uint32", true, false},
  257. {"value", "String", true, false}});
  258. CreateShardedTable(server, sender, "/Root", "table-1", opts);
  259. const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
  260. const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
  261. const int batches = 10;
  262. const int batchItems = 10;
  263. for (auto batch : xrange(batches)) {
  264. TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES ";
  265. for (auto item = 0; item < batchItems; item++) {
  266. if (item != 0)
  267. query += ", ";
  268. query += "(0, \"" + TString(7000, 'x') + ToString(batch * batchItems + item) + "\") ";
  269. }
  270. Cerr << query << Endl << Endl;
  271. ExecSQL(server, sender, query);
  272. CompactTable(runtime, shard1, tableId1, false);
  273. Cerr << "... waiting for stats after compaction" << Endl;
  274. auto stats = WaitTableStats(runtime, shard1, 1, (batch + 1) * batchItems);
  275. UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
  276. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), (batch + 1) * batchItems);
  277. UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
  278. }
  279. // each batch ~70KB, ~700KB in total
  280. auto counters = MakeIntrusive<TSharedPageCacheCounters>(runtime.GetDynamicCounters());
  281. Cerr << "ActiveBytes = " << counters->ActiveBytes->Val() << " PassiveBytes = " << counters->PassiveBytes->Val() << Endl;
  282. UNIT_ASSERT_LE(counters->ActiveBytes->Val(), 800*1024); // one index
  283. }
  284. } // Y_UNIT_TEST_SUITE(DataShardStats)
  285. } // namespace NKikimr