block_histogram.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. #include "block_histogram.h"
  2. #include <library/cpp/histogram/adaptive/protos/histo.pb.h>
  3. #include <util/generic/algorithm.h>
  4. #include <util/generic/yexception.h>
  5. #include <util/generic/intrlist.h>
  6. #include <util/generic/ptr.h>
  7. #include <util/generic/queue.h>
  8. #include <util/generic/ymath.h>
  9. #include <util/string/printf.h>
  10. #include <format>
  11. namespace {
  12. struct TEmpty {
  13. };
  14. class TSmartHeap {
  15. private:
  16. TVector<ui32> A;
  17. TVector<ui32> Pos;
  18. const TVector<double>& Weights;
  19. public:
  20. TSmartHeap(const TVector<double>& weights)
  21. : A(weights.size())
  22. , Pos(weights.size())
  23. , Weights(weights)
  24. {
  25. for (ui32 i = 0; i < weights.size(); ++i) {
  26. A[i] = i;
  27. Pos[i] = i;
  28. }
  29. for (ui32 i = weights.size() / 2; i > 0; --i) {
  30. Down(i - 1);
  31. }
  32. }
  33. ui32 IdOfMin() {
  34. return A[0];
  35. }
  36. void Pop() {
  37. A[0] = A.back();
  38. Pos[A[0]] = 0;
  39. A.pop_back();
  40. Down(0);
  41. }
  42. void DownElement(ui32 id) {
  43. Down(Pos[id]);
  44. }
  45. private:
  46. void SwapPositions(ui32 x, ui32 y) {
  47. std::swap(A[x], A[y]);
  48. Pos[A[x]] = x;
  49. Pos[A[y]] = y;
  50. }
  51. void Down(ui32 pos) {
  52. while (1) {
  53. ui32 left = pos * 2 + 1;
  54. ui32 right = pos * 2 + 2;
  55. ui32 min = pos;
  56. if (left < A.size() && Weights[A[min]] > Weights[A[left]])
  57. min = left;
  58. if (right < A.size() && Weights[A[min]] > Weights[A[right]])
  59. min = right;
  60. if (pos == min)
  61. break;
  62. SwapPositions(min, pos);
  63. pos = min;
  64. }
  65. }
  66. };
  67. }
  68. namespace NKiwiAggr {
  69. ///////////////////
  70. // TBlockHistogram
  71. ///////////////////
  72. TBlockHistogram::TBlockHistogram(EHistogramType type, TQualityFunction calcQuality,
  73. size_t intervals, ui64 id, size_t shrinkSize)
  74. : Type(type)
  75. , CalcQuality(calcQuality)
  76. , Intervals(intervals)
  77. , ShrinkSize(shrinkSize)
  78. , PrevSize(0)
  79. , Id(id)
  80. , Sum(0)
  81. , MinValue(0)
  82. , MaxValue(0)
  83. {
  84. CorrectShrinkSize();
  85. }
  86. void TBlockHistogram::Clear() {
  87. PrevSize = 0;
  88. Sum = 0.0;
  89. MinValue = 0.0;
  90. MaxValue = 0.0;
  91. Bins.clear();
  92. }
  93. void TBlockHistogram::Add(const THistoRec& rec) {
  94. if (!rec.HasId() || rec.GetId() == Id) {
  95. Add(rec.GetValue(), rec.GetWeight());
  96. }
  97. }
  98. void TBlockHistogram::Add(double value, double weight) {
  99. if (!IsValidFloat(value) || !IsValidFloat(weight)) {
  100. ythrow yexception() << Sprintf("Histogram id %lu: bad value %f weight %f", Id, value, weight);
  101. }
  102. if (weight <= 0.0) {
  103. return; // all zero-weighted values should be skipped because they don't affect the distribution, negative weights are forbidden
  104. }
  105. if (Bins.empty()) {
  106. MinValue = value;
  107. MaxValue = value;
  108. } else {
  109. MinValue = Min(MinValue, value);
  110. MaxValue = Max(MaxValue, value);
  111. }
  112. Sum += weight;
  113. if (Bins.size() > ShrinkSize) {
  114. SortAndShrink(Intervals * SHRINK_MULTIPLIER);
  115. }
  116. Bins.push_back(TWeightedValue(value, weight));
  117. }
  118. void TBlockHistogram::Merge(const THistogram& histo, double multiplier) {
  119. if (!IsValidFloat(histo.GetMinValue()) || !IsValidFloat(histo.GetMaxValue())) {
  120. Cerr << std::format(
  121. "Merging in histogram id {}: skip bad histo with minvalue {} maxvalue {}\n",
  122. Id, histo.GetMinValue(), histo.GetMaxValue()
  123. );
  124. return;
  125. }
  126. if (histo.FreqSize() == 0) {
  127. return; // skip empty histos
  128. }
  129. if (histo.GetType() == HT_ADAPTIVE_DISTANCE_HISTOGRAM ||
  130. histo.GetType() == HT_ADAPTIVE_WEIGHT_HISTOGRAM ||
  131. histo.GetType() == HT_ADAPTIVE_WARD_HISTOGRAM ||
  132. histo.GetType() == HT_ADAPTIVE_HISTOGRAM)
  133. {
  134. Y_ABORT_UNLESS(histo.FreqSize() == histo.PositionSize(), "Corrupted histo");
  135. for (size_t j = 0; j < histo.FreqSize(); ++j) {
  136. double value = histo.GetPosition(j);
  137. double weight = histo.GetFreq(j);
  138. if (!IsValidFloat(value) || !IsValidFloat(weight)) {
  139. Cerr << std::format(
  140. "Merging in histogram id {}: skip bad value {} weight {}\n",
  141. Id, value, weight
  142. );
  143. continue;
  144. }
  145. Add(value, weight * multiplier);
  146. }
  147. MinValue = Min(MinValue, histo.GetMinValue());
  148. MaxValue = Max(MaxValue, histo.GetMaxValue());
  149. } else if (histo.GetType() == HT_FIXED_BIN_HISTOGRAM) {
  150. double pos = histo.GetMinValue() + histo.GetBinRange() / 2.0;
  151. for (size_t j = 0; j < histo.FreqSize(); ++j) {
  152. double weight = histo.GetFreq(j);
  153. if (!IsValidFloat(pos) || !IsValidFloat(weight)) {
  154. Cerr << std::format(
  155. "Merging in histogram id {}: skip bad value {} weight {}\n",
  156. Id, pos, weight
  157. );
  158. pos += histo.GetBinRange();
  159. continue;
  160. }
  161. Add(pos, weight * multiplier);
  162. pos += histo.GetBinRange();
  163. }
  164. MinValue = Min(MinValue, histo.GetMinValue());
  165. MaxValue = Max(MaxValue, histo.GetMaxValue());
  166. } else {
  167. ythrow yexception() << "Unknown THistogram type";
  168. }
  169. }
  170. void TBlockHistogram::Merge(const TVector<THistogram>& histogramsToMerge) {
  171. for (size_t i = 0; i < histogramsToMerge.size(); ++i) {
  172. Merge(histogramsToMerge[i], 1.0);
  173. }
  174. }
  175. void TBlockHistogram::Merge(TVector<IHistogramPtr> histogramsToMerge) {
  176. Y_UNUSED(histogramsToMerge);
  177. ythrow yexception() << "IHistogram::Merge(TVector<IHistogramPtr>) is not defined for TBlockHistogram";
  178. }
  179. void TBlockHistogram::Multiply(double factor) {
  180. if (!IsValidFloat(factor) || factor <= 0) {
  181. ythrow yexception() << "Not valid factor in IHistogram::Multiply(): " << factor;
  182. }
  183. Sum *= factor;
  184. for (TVector<TWeightedValue>::iterator it = Bins.begin(); it != Bins.end(); ++it) {
  185. it->second *= factor;
  186. }
  187. }
  188. void TBlockHistogram::FromProto(const THistogram& histo) {
  189. Y_ABORT_UNLESS(histo.HasType(), "Attempt to parse TBlockHistogram from THistogram protobuf with no Type field set");
  190. ;
  191. switch (histo.GetType()) { // check that histogram type is correct
  192. case HT_ADAPTIVE_DISTANCE_HISTOGRAM:
  193. case HT_ADAPTIVE_WEIGHT_HISTOGRAM:
  194. case HT_ADAPTIVE_WARD_HISTOGRAM:
  195. case HT_ADAPTIVE_HISTOGRAM:
  196. break; // ok
  197. default: // not ok
  198. ythrow yexception() << "Attempt to parse TBlockHistogram from THistogram protobuf record of type = " << (ui32)histo.GetType();
  199. }
  200. if (histo.FreqSize() != histo.PositionSize()) {
  201. ythrow yexception() << "Attempt to parse TBlockHistogram from THistogram protobuf record where FreqSize != PositionSize. FreqSize == " << (ui32)histo.FreqSize() << ", PositionSize == " << (ui32)histo.PositionSize();
  202. }
  203. Id = histo.GetId();
  204. Sum = 0;
  205. Intervals = Max(Intervals, histo.FreqSize());
  206. CorrectShrinkSize();
  207. Bins.resize(histo.FreqSize());
  208. PrevSize = Bins.size();
  209. for (size_t i = 0; i < histo.FreqSize(); ++i) {
  210. double value = histo.GetPosition(i);
  211. double weight = histo.GetFreq(i);
  212. if (!IsValidFloat(value) || !IsValidFloat(weight)) {
  213. Cerr << std::format(
  214. "FromProto in histogram id {}: skip bad value {} weight {}\n",
  215. Id, value, weight
  216. );
  217. continue;
  218. }
  219. Bins[i].first = value;
  220. Bins[i].second = weight;
  221. Sum += Bins[i].second;
  222. }
  223. if (!IsValidFloat(histo.GetMinValue()) || !IsValidFloat(histo.GetMaxValue())) {
  224. ythrow yexception() << Sprintf("FromProto in histogram id %lu: skip bad histo with minvalue %f maxvalue %f", Id, histo.GetMinValue(), histo.GetMaxValue());
  225. }
  226. MinValue = histo.GetMinValue();
  227. MaxValue = histo.GetMaxValue();
  228. }
  229. void TBlockHistogram::ToProto(THistogram& histo) {
  230. histo.Clear();
  231. histo.SetType(Type);
  232. histo.SetId(Id);
  233. if (Empty()) {
  234. return;
  235. }
  236. SortAndShrink(Intervals, true);
  237. histo.SetMinValue(MinValue);
  238. histo.SetMaxValue(MaxValue);
  239. for (TVector<TWeightedValue>::const_iterator it = Bins.begin(); it != Bins.end(); ++it) {
  240. histo.AddFreq(it->second);
  241. histo.AddPosition(it->first);
  242. }
  243. }
  244. void TBlockHistogram::SetId(ui64 id) {
  245. Id = id;
  246. }
  247. ui64 TBlockHistogram::GetId() {
  248. return Id;
  249. }
  250. bool TBlockHistogram::Empty() {
  251. return Bins.empty();
  252. }
  253. double TBlockHistogram::GetMinValue() {
  254. return MinValue;
  255. }
  256. double TBlockHistogram::GetMaxValue() {
  257. return MaxValue;
  258. }
  259. double TBlockHistogram::GetSum() {
  260. return Sum;
  261. }
  262. void TBlockHistogram::SortAndShrink(size_t intervals, bool final) {
  263. Y_ABORT_UNLESS(intervals > 0);
  264. if (Bins.size() <= intervals) {
  265. return;
  266. }
  267. if (Bins.size() >= Intervals * GREEDY_SHRINK_MULTIPLIER) {
  268. SortBins();
  269. UniquifyBins();
  270. FastGreedyShrink(intervals);
  271. if (final) {
  272. SlowShrink(intervals);
  273. }
  274. } else {
  275. SortBins();
  276. UniquifyBins();
  277. SlowShrink(intervals);
  278. }
  279. }
  280. void TBlockHistogram::SortBins() {
  281. Sort(Bins.begin() + PrevSize, Bins.end());
  282. if (PrevSize != 0) {
  283. TVector<TWeightedValue> temp(Bins.begin(), Bins.begin() + PrevSize);
  284. std::merge(temp.begin(), temp.end(), Bins.begin() + PrevSize, Bins.end(), Bins.begin());
  285. }
  286. }
  287. void TBlockHistogram::UniquifyBins() {
  288. if (Bins.empty())
  289. return;
  290. auto it1 = Bins.begin();
  291. auto it2 = Bins.begin();
  292. while (++it2 != Bins.end()) {
  293. if (it1->first == it2->first) {
  294. it1->second += it2->second;
  295. } else {
  296. *(++it1) = *it2;
  297. }
  298. }
  299. Bins.erase(++it1, Bins.end());
  300. }
  301. void TBlockHistogram::CorrectShrinkSize() {
  302. ShrinkSize = Max(ShrinkSize, Intervals * (SHRINK_MULTIPLIER + GREEDY_SHRINK_MULTIPLIER));
  303. }
  304. void TBlockHistogram::SlowShrink(size_t intervals) {
  305. {
  306. size_t pos = 0;
  307. for (size_t i = 1; i < Bins.size(); ++i)
  308. if (Bins[i].first - Bins[pos].first < 1e-9) {
  309. Bins[pos].second += Bins[i].second;
  310. } else {
  311. ++pos;
  312. Bins[pos] = Bins[i];
  313. }
  314. Bins.resize(pos + 1);
  315. PrevSize = pos + 1;
  316. }
  317. if (Bins.size() <= intervals) {
  318. return;
  319. }
  320. typedef TIntrusiveListItem<TEmpty> TListItem;
  321. ui32 n = Bins.size() - 1;
  322. const ui32 end = (ui32)Bins.size();
  323. TArrayHolder<TListItem> listElementsHolder(new TListItem[end + 1]);
  324. TListItem* const bins = listElementsHolder.Get();
  325. for (ui32 i = 1; i <= end; ++i) {
  326. bins[i].LinkAfter(&bins[i - 1]);
  327. }
  328. TVector<double> pairWeights(n);
  329. for (ui32 i = 0; i < n; ++i) {
  330. pairWeights[i] = CalcQuality(Bins[i], Bins[i + 1]).first;
  331. }
  332. TSmartHeap heap(pairWeights);
  333. while (n + 1 > intervals) {
  334. ui32 b = heap.IdOfMin();
  335. heap.Pop();
  336. ui32 a = (ui32)(bins[b].Prev() - bins);
  337. ui32 c = (ui32)(bins[b].Next() - bins);
  338. ui32 d = (ui32)(bins[b].Next()->Next() - bins);
  339. Y_ABORT_UNLESS(Bins[c].second != -1);
  340. double mass = Bins[b].second + Bins[c].second;
  341. Bins[c].first = (Bins[b].first * Bins[b].second + Bins[c].first * Bins[c].second) / mass;
  342. Bins[c].second = mass;
  343. bins[b].Unlink();
  344. Bins[b].second = -1;
  345. if (a != end) {
  346. pairWeights[a] = CalcQuality(Bins[a], Bins[c]).first;
  347. heap.DownElement(a);
  348. }
  349. if (d != end && c + 1 != Bins.size()) {
  350. pairWeights[c] = CalcQuality(Bins[c], Bins[d]).first;
  351. heap.DownElement(c);
  352. }
  353. --n;
  354. }
  355. size_t pos = 0;
  356. for (TListItem* it = bins[end].Next(); it != &bins[end]; it = it->Next()) {
  357. Bins[pos++] = Bins[it - bins];
  358. }
  359. Bins.resize(pos);
  360. PrevSize = pos;
  361. Y_ABORT_UNLESS(pos == intervals);
  362. }
  363. double TBlockHistogram::GetSumInRange(double leftBound, double rightBound) {
  364. Y_UNUSED(leftBound);
  365. Y_UNUSED(rightBound);
  366. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  367. return 0;
  368. }
  369. double TBlockHistogram::GetSumAboveBound(double bound) {
  370. Y_UNUSED(bound);
  371. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  372. return 0;
  373. }
  374. double TBlockHistogram::GetSumBelowBound(double bound) {
  375. Y_UNUSED(bound);
  376. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  377. return 0;
  378. }
  379. double TBlockHistogram::CalcUpperBound(double sum) {
  380. Y_UNUSED(sum);
  381. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  382. return 0;
  383. }
  384. double TBlockHistogram::CalcLowerBound(double sum) {
  385. Y_UNUSED(sum);
  386. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  387. return 0;
  388. }
  389. double TBlockHistogram::CalcUpperBoundSafe(double sum) {
  390. Y_UNUSED(sum);
  391. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  392. return 0;
  393. }
  394. double TBlockHistogram::CalcLowerBoundSafe(double sum) {
  395. Y_UNUSED(sum);
  396. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  397. return 0;
  398. }
  399. /////////////////////////
  400. // TBlockWeightHistogram
  401. /////////////////////////
  402. TBlockWeightHistogram::TBlockWeightHistogram(size_t intervals, ui64 id, size_t shrinkSize)
  403. : TBlockHistogram(HT_ADAPTIVE_WEIGHT_HISTOGRAM, CalcWeightQuality, intervals, id, shrinkSize)
  404. {
  405. }
  406. void TBlockWeightHistogram::FastGreedyShrink(size_t intervals) {
  407. if (Bins.size() <= intervals)
  408. return;
  409. double slab = Sum / intervals;
  410. size_t i = 0;
  411. size_t pos = 0;
  412. while (i < Bins.size()) {
  413. double curW = Bins[i].second;
  414. double curMul = Bins[i].first * Bins[i].second;
  415. ++i;
  416. while (i < Bins.size() && curW + Bins[i].second <= slab && pos + Bins.size() - i >= intervals) {
  417. curW += Bins[i].second;
  418. curMul += Bins[i].first * Bins[i].second;
  419. ++i;
  420. }
  421. Bins[pos++] = TWeightedValue(curMul / curW, curW);
  422. }
  423. Bins.resize(pos);
  424. PrevSize = pos;
  425. }
  426. ///////////////////////
  427. // TBlockWardHistogram
  428. ///////////////////////
  429. TBlockWardHistogram::TBlockWardHistogram(size_t intervals, ui64 id, size_t shrinkSize)
  430. : TBlockHistogram(HT_ADAPTIVE_WARD_HISTOGRAM, CalcWardQuality, intervals, id, shrinkSize)
  431. {
  432. }
  433. bool TBlockWardHistogram::CalcSplitInfo(
  434. const TCumulatives::const_iterator beg,
  435. const TCumulatives::const_iterator end, // (!) points to the final element
  436. TSplitInfo& splitInfo // out
  437. ) {
  438. if (end - beg < 2) {
  439. return false;
  440. }
  441. TCumulatives::const_iterator mid = LowerBound(beg, end + 1, TCumulative{(beg->first + end->first) / 2, 0.});
  442. if (mid == beg) {
  443. mid++;
  444. } else if (mid == end) {
  445. mid--;
  446. }
  447. // derived from Ward's minimum variance criterion
  448. double profit = 0.0;
  449. profit += (mid->second - beg->second) * (mid->second - beg->second) / (mid->first - beg->first);
  450. profit += (end->second - mid->second) * (end->second - mid->second) / (end->first - mid->first);
  451. profit -= (end->second - beg->second) * (end->second - beg->second) / (end->first - beg->first);
  452. splitInfo = {profit, beg, mid, end};
  453. return true;
  454. }
  455. void TBlockWardHistogram::FastGreedyShrink(size_t intervals) {
  456. Y_ABORT_UNLESS(intervals > 0);
  457. if (Bins.size() <= intervals) {
  458. return;
  459. }
  460. // fill cumulative sums
  461. // sum at index i equals to the sum of all values before i
  462. // sum at index i+1 equals to the sum of all values before i with the value at i added
  463. TCumulatives cumulatives;
  464. cumulatives.reserve(Bins.size() + 1);
  465. TCumulative cumulative = {0., 0.};
  466. cumulatives.push_back(cumulative);
  467. for (size_t i = 0; i < Bins.size(); i++) {
  468. cumulative.first += Bins[i].second;
  469. cumulative.second += Bins[i].second * Bins[i].first;
  470. cumulatives.push_back(cumulative);
  471. }
  472. TVector<TCumulatives::const_iterator> splits;
  473. splits.reserve(intervals + 1);
  474. splits.push_back(cumulatives.begin());
  475. splits.push_back(cumulatives.end() - 1);
  476. TPriorityQueue<TSplitInfo> candidates;
  477. // explicitly add first split
  478. TSplitInfo newSplitInfo;
  479. if (CalcSplitInfo(cumulatives.begin(), cumulatives.end() - 1, newSplitInfo)) {
  480. candidates.push(newSplitInfo);
  481. }
  482. // recursively split until done
  483. for (size_t split = 0; split < intervals - 1 && !candidates.empty(); split++) {
  484. TSplitInfo curSplitInfo = candidates.top();
  485. candidates.pop();
  486. splits.push_back(curSplitInfo.mid);
  487. if (CalcSplitInfo(curSplitInfo.beg, curSplitInfo.mid, newSplitInfo)) {
  488. candidates.push(newSplitInfo);
  489. }
  490. if (CalcSplitInfo(curSplitInfo.mid, curSplitInfo.end, newSplitInfo)) {
  491. candidates.push(newSplitInfo);
  492. }
  493. }
  494. // calclate new bin centers and weights
  495. Sort(splits.begin(), splits.end());
  496. Bins.clear();
  497. for (auto it = splits.begin(); it + 1 != splits.end(); ++it) {
  498. auto splitBeg = *it;
  499. auto splitEnd = *(it + 1);
  500. double cnt = (splitEnd->first - splitBeg->first);
  501. double mu = (splitEnd->second - splitBeg->second) / cnt;
  502. Bins.push_back(TWeightedValue(mu, cnt));
  503. }
  504. }
  505. }