block_histogram.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. #include "block_histogram.h"
  2. #include <library/cpp/histogram/adaptive/protos/histo.pb.h>
  3. #include <util/generic/algorithm.h>
  4. #include <util/generic/yexception.h>
  5. #include <util/generic/intrlist.h>
  6. #include <util/generic/ptr.h>
  7. #include <util/generic/queue.h>
  8. #include <util/generic/ymath.h>
  9. #include <util/string/printf.h>
  10. namespace {
  11. struct TEmpty {
  12. };
  13. class TSmartHeap {
  14. private:
  15. TVector<ui32> A;
  16. TVector<ui32> Pos;
  17. const TVector<double>& Weights;
  18. public:
  19. TSmartHeap(const TVector<double>& weights)
  20. : A(weights.size())
  21. , Pos(weights.size())
  22. , Weights(weights)
  23. {
  24. for (ui32 i = 0; i < weights.size(); ++i) {
  25. A[i] = i;
  26. Pos[i] = i;
  27. }
  28. for (ui32 i = weights.size() / 2; i > 0; --i) {
  29. Down(i - 1);
  30. }
  31. }
  32. ui32 IdOfMin() {
  33. return A[0];
  34. }
  35. void Pop() {
  36. A[0] = A.back();
  37. Pos[A[0]] = 0;
  38. A.pop_back();
  39. Down(0);
  40. }
  41. void DownElement(ui32 id) {
  42. Down(Pos[id]);
  43. }
  44. private:
  45. void SwapPositions(ui32 x, ui32 y) {
  46. std::swap(A[x], A[y]);
  47. Pos[A[x]] = x;
  48. Pos[A[y]] = y;
  49. }
  50. void Down(ui32 pos) {
  51. while (1) {
  52. ui32 left = pos * 2 + 1;
  53. ui32 right = pos * 2 + 2;
  54. ui32 min = pos;
  55. if (left < A.size() && Weights[A[min]] > Weights[A[left]])
  56. min = left;
  57. if (right < A.size() && Weights[A[min]] > Weights[A[right]])
  58. min = right;
  59. if (pos == min)
  60. break;
  61. SwapPositions(min, pos);
  62. pos = min;
  63. }
  64. }
  65. };
  66. }
  67. namespace NKiwiAggr {
  68. ///////////////////
  69. // TBlockHistogram
  70. ///////////////////
  71. TBlockHistogram::TBlockHistogram(EHistogramType type, TQualityFunction calcQuality,
  72. size_t intervals, ui64 id, size_t shrinkSize)
  73. : Type(type)
  74. , CalcQuality(calcQuality)
  75. , Intervals(intervals)
  76. , ShrinkSize(shrinkSize)
  77. , PrevSize(0)
  78. , Id(id)
  79. , Sum(0)
  80. , MinValue(0)
  81. , MaxValue(0)
  82. {
  83. CorrectShrinkSize();
  84. }
  85. void TBlockHistogram::Clear() {
  86. PrevSize = 0;
  87. Sum = 0.0;
  88. MinValue = 0.0;
  89. MaxValue = 0.0;
  90. Bins.clear();
  91. }
  92. void TBlockHistogram::Add(const THistoRec& rec) {
  93. if (!rec.HasId() || rec.GetId() == Id) {
  94. Add(rec.GetValue(), rec.GetWeight());
  95. }
  96. }
  97. void TBlockHistogram::Add(double value, double weight) {
  98. if (!IsValidFloat(value) || !IsValidFloat(weight)) {
  99. ythrow yexception() << Sprintf("Histogram id %lu: bad value %f weight %f", Id, value, weight);
  100. }
  101. if (weight <= 0.0) {
  102. return; // all zero-weighted values should be skipped because they don't affect the distribution, negative weights are forbidden
  103. }
  104. if (Bins.empty()) {
  105. MinValue = value;
  106. MaxValue = value;
  107. } else {
  108. MinValue = Min(MinValue, value);
  109. MaxValue = Max(MaxValue, value);
  110. }
  111. Sum += weight;
  112. if (Bins.size() > ShrinkSize) {
  113. SortAndShrink(Intervals * SHRINK_MULTIPLIER);
  114. }
  115. Bins.push_back(TWeightedValue(value, weight));
  116. }
  117. void TBlockHistogram::Merge(const THistogram& histo, double multiplier) {
  118. if (!IsValidFloat(histo.GetMinValue()) || !IsValidFloat(histo.GetMaxValue())) {
  119. fprintf(stderr, "Merging in histogram id %lu: skip bad histo with minvalue %f maxvalue %f\n", Id, histo.GetMinValue(), histo.GetMaxValue());
  120. return;
  121. }
  122. if (histo.FreqSize() == 0) {
  123. return; // skip empty histos
  124. }
  125. if (histo.GetType() == HT_ADAPTIVE_DISTANCE_HISTOGRAM ||
  126. histo.GetType() == HT_ADAPTIVE_WEIGHT_HISTOGRAM ||
  127. histo.GetType() == HT_ADAPTIVE_WARD_HISTOGRAM ||
  128. histo.GetType() == HT_ADAPTIVE_HISTOGRAM)
  129. {
  130. Y_VERIFY(histo.FreqSize() == histo.PositionSize(), "Corrupted histo");
  131. for (size_t j = 0; j < histo.FreqSize(); ++j) {
  132. double value = histo.GetPosition(j);
  133. double weight = histo.GetFreq(j);
  134. if (!IsValidFloat(value) || !IsValidFloat(weight)) {
  135. fprintf(stderr, "Merging in histogram id %lu: skip bad value %f weight %f\n", Id, value, weight);
  136. continue;
  137. }
  138. Add(value, weight * multiplier);
  139. }
  140. MinValue = Min(MinValue, histo.GetMinValue());
  141. MaxValue = Max(MaxValue, histo.GetMaxValue());
  142. } else if (histo.GetType() == HT_FIXED_BIN_HISTOGRAM) {
  143. double pos = histo.GetMinValue() + histo.GetBinRange() / 2.0;
  144. for (size_t j = 0; j < histo.FreqSize(); ++j) {
  145. double weight = histo.GetFreq(j);
  146. if (!IsValidFloat(pos) || !IsValidFloat(weight)) {
  147. fprintf(stderr, "Merging in histogram id %lu: skip bad value %f weight %f\n", Id, pos, weight);
  148. pos += histo.GetBinRange();
  149. continue;
  150. }
  151. Add(pos, weight * multiplier);
  152. pos += histo.GetBinRange();
  153. }
  154. MinValue = Min(MinValue, histo.GetMinValue());
  155. MaxValue = Max(MaxValue, histo.GetMaxValue());
  156. } else {
  157. ythrow yexception() << "Unknown THistogram type";
  158. }
  159. }
  160. void TBlockHistogram::Merge(const TVector<THistogram>& histogramsToMerge) {
  161. for (size_t i = 0; i < histogramsToMerge.size(); ++i) {
  162. Merge(histogramsToMerge[i], 1.0);
  163. }
  164. }
  165. void TBlockHistogram::Merge(TVector<IHistogramPtr> histogramsToMerge) {
  166. Y_UNUSED(histogramsToMerge);
  167. ythrow yexception() << "IHistogram::Merge(TVector<IHistogramPtr>) is not defined for TBlockHistogram";
  168. }
  169. void TBlockHistogram::Multiply(double factor) {
  170. if (!IsValidFloat(factor) || factor <= 0) {
  171. ythrow yexception() << "Not valid factor in IHistogram::Multiply(): " << factor;
  172. }
  173. Sum *= factor;
  174. for (TVector<TWeightedValue>::iterator it = Bins.begin(); it != Bins.end(); ++it) {
  175. it->second *= factor;
  176. }
  177. }
  178. void TBlockHistogram::FromProto(const THistogram& histo) {
  179. Y_VERIFY(histo.HasType(), "Attempt to parse TBlockHistogram from THistogram protobuf with no Type field set");
  180. ;
  181. switch (histo.GetType()) { // check that histogram type is correct
  182. case HT_ADAPTIVE_DISTANCE_HISTOGRAM:
  183. case HT_ADAPTIVE_WEIGHT_HISTOGRAM:
  184. case HT_ADAPTIVE_WARD_HISTOGRAM:
  185. case HT_ADAPTIVE_HISTOGRAM:
  186. break; // ok
  187. default: // not ok
  188. ythrow yexception() << "Attempt to parse TBlockHistogram from THistogram protobuf record of type = " << (ui32)histo.GetType();
  189. }
  190. if (histo.FreqSize() != histo.PositionSize()) {
  191. ythrow yexception() << "Attempt to parse TBlockHistogram from THistogram protobuf record where FreqSize != PositionSize. FreqSize == " << (ui32)histo.FreqSize() << ", PositionSize == " << (ui32)histo.PositionSize();
  192. }
  193. Id = histo.GetId();
  194. Sum = 0;
  195. Intervals = Max(Intervals, histo.FreqSize());
  196. CorrectShrinkSize();
  197. Bins.resize(histo.FreqSize());
  198. PrevSize = Bins.size();
  199. for (size_t i = 0; i < histo.FreqSize(); ++i) {
  200. double value = histo.GetPosition(i);
  201. double weight = histo.GetFreq(i);
  202. if (!IsValidFloat(value) || !IsValidFloat(weight)) {
  203. fprintf(stderr, "FromProto in histogram id %lu: skip bad value %f weight %f\n", Id, value, weight);
  204. continue;
  205. }
  206. Bins[i].first = value;
  207. Bins[i].second = weight;
  208. Sum += Bins[i].second;
  209. }
  210. if (!IsValidFloat(histo.GetMinValue()) || !IsValidFloat(histo.GetMaxValue())) {
  211. ythrow yexception() << Sprintf("FromProto in histogram id %lu: skip bad histo with minvalue %f maxvalue %f", Id, histo.GetMinValue(), histo.GetMaxValue());
  212. }
  213. MinValue = histo.GetMinValue();
  214. MaxValue = histo.GetMaxValue();
  215. }
  216. void TBlockHistogram::ToProto(THistogram& histo) {
  217. histo.Clear();
  218. histo.SetType(Type);
  219. histo.SetId(Id);
  220. if (Empty()) {
  221. return;
  222. }
  223. SortAndShrink(Intervals, true);
  224. histo.SetMinValue(MinValue);
  225. histo.SetMaxValue(MaxValue);
  226. for (TVector<TWeightedValue>::const_iterator it = Bins.begin(); it != Bins.end(); ++it) {
  227. histo.AddFreq(it->second);
  228. histo.AddPosition(it->first);
  229. }
  230. }
  231. void TBlockHistogram::SetId(ui64 id) {
  232. Id = id;
  233. }
  234. ui64 TBlockHistogram::GetId() {
  235. return Id;
  236. }
  237. bool TBlockHistogram::Empty() {
  238. return Bins.empty();
  239. }
  240. double TBlockHistogram::GetMinValue() {
  241. return MinValue;
  242. }
  243. double TBlockHistogram::GetMaxValue() {
  244. return MaxValue;
  245. }
  246. double TBlockHistogram::GetSum() {
  247. return Sum;
  248. }
  249. void TBlockHistogram::SortAndShrink(size_t intervals, bool final) {
  250. Y_VERIFY(intervals > 0);
  251. if (Bins.size() <= intervals) {
  252. return;
  253. }
  254. if (Bins.size() >= Intervals * GREEDY_SHRINK_MULTIPLIER) {
  255. SortBins();
  256. UniquifyBins();
  257. FastGreedyShrink(intervals);
  258. if (final) {
  259. SlowShrink(intervals);
  260. }
  261. } else {
  262. SortBins();
  263. UniquifyBins();
  264. SlowShrink(intervals);
  265. }
  266. }
  267. void TBlockHistogram::SortBins() {
  268. Sort(Bins.begin() + PrevSize, Bins.end());
  269. if (PrevSize != 0) {
  270. TVector<TWeightedValue> temp(Bins.begin(), Bins.begin() + PrevSize);
  271. std::merge(temp.begin(), temp.end(), Bins.begin() + PrevSize, Bins.end(), Bins.begin());
  272. }
  273. }
  274. void TBlockHistogram::UniquifyBins() {
  275. if (Bins.empty())
  276. return;
  277. auto it1 = Bins.begin();
  278. auto it2 = Bins.begin();
  279. while (++it2 != Bins.end()) {
  280. if (it1->first == it2->first) {
  281. it1->second += it2->second;
  282. } else {
  283. *(++it1) = *it2;
  284. }
  285. }
  286. Bins.erase(++it1, Bins.end());
  287. }
  288. void TBlockHistogram::CorrectShrinkSize() {
  289. ShrinkSize = Max(ShrinkSize, Intervals * (SHRINK_MULTIPLIER + GREEDY_SHRINK_MULTIPLIER));
  290. }
  291. void TBlockHistogram::SlowShrink(size_t intervals) {
  292. {
  293. size_t pos = 0;
  294. for (size_t i = 1; i < Bins.size(); ++i)
  295. if (Bins[i].first - Bins[pos].first < 1e-9) {
  296. Bins[pos].second += Bins[i].second;
  297. } else {
  298. ++pos;
  299. Bins[pos] = Bins[i];
  300. }
  301. Bins.resize(pos + 1);
  302. PrevSize = pos + 1;
  303. }
  304. if (Bins.size() <= intervals) {
  305. return;
  306. }
  307. typedef TIntrusiveListItem<TEmpty> TListItem;
  308. ui32 n = Bins.size() - 1;
  309. const ui32 end = (ui32)Bins.size();
  310. TArrayHolder<TListItem> listElementsHolder(new TListItem[end + 1]);
  311. TListItem* const bins = listElementsHolder.Get();
  312. for (ui32 i = 1; i <= end; ++i) {
  313. bins[i].LinkAfter(&bins[i - 1]);
  314. }
  315. TVector<double> pairWeights(n);
  316. for (ui32 i = 0; i < n; ++i) {
  317. pairWeights[i] = CalcQuality(Bins[i], Bins[i + 1]).first;
  318. }
  319. TSmartHeap heap(pairWeights);
  320. while (n + 1 > intervals) {
  321. ui32 b = heap.IdOfMin();
  322. heap.Pop();
  323. ui32 a = (ui32)(bins[b].Prev() - bins);
  324. ui32 c = (ui32)(bins[b].Next() - bins);
  325. ui32 d = (ui32)(bins[b].Next()->Next() - bins);
  326. Y_VERIFY(Bins[c].second != -1);
  327. double mass = Bins[b].second + Bins[c].second;
  328. Bins[c].first = (Bins[b].first * Bins[b].second + Bins[c].first * Bins[c].second) / mass;
  329. Bins[c].second = mass;
  330. bins[b].Unlink();
  331. Bins[b].second = -1;
  332. if (a != end) {
  333. pairWeights[a] = CalcQuality(Bins[a], Bins[c]).first;
  334. heap.DownElement(a);
  335. }
  336. if (d != end && c + 1 != Bins.size()) {
  337. pairWeights[c] = CalcQuality(Bins[c], Bins[d]).first;
  338. heap.DownElement(c);
  339. }
  340. --n;
  341. }
  342. size_t pos = 0;
  343. for (TListItem* it = bins[end].Next(); it != &bins[end]; it = it->Next()) {
  344. Bins[pos++] = Bins[it - bins];
  345. }
  346. Bins.resize(pos);
  347. PrevSize = pos;
  348. Y_VERIFY(pos == intervals);
  349. }
  350. double TBlockHistogram::GetSumInRange(double leftBound, double rightBound) {
  351. Y_UNUSED(leftBound);
  352. Y_UNUSED(rightBound);
  353. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  354. return 0;
  355. }
  356. double TBlockHistogram::GetSumAboveBound(double bound) {
  357. Y_UNUSED(bound);
  358. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  359. return 0;
  360. }
  361. double TBlockHistogram::GetSumBelowBound(double bound) {
  362. Y_UNUSED(bound);
  363. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  364. return 0;
  365. }
  366. double TBlockHistogram::CalcUpperBound(double sum) {
  367. Y_UNUSED(sum);
  368. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  369. return 0;
  370. }
  371. double TBlockHistogram::CalcLowerBound(double sum) {
  372. Y_UNUSED(sum);
  373. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  374. return 0;
  375. }
  376. double TBlockHistogram::CalcUpperBoundSafe(double sum) {
  377. Y_UNUSED(sum);
  378. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  379. return 0;
  380. }
  381. double TBlockHistogram::CalcLowerBoundSafe(double sum) {
  382. Y_UNUSED(sum);
  383. ythrow yexception() << "Method is not implemented for TBlockHistogram";
  384. return 0;
  385. }
  386. /////////////////////////
  387. // TBlockWeightHistogram
  388. /////////////////////////
  389. TBlockWeightHistogram::TBlockWeightHistogram(size_t intervals, ui64 id, size_t shrinkSize)
  390. : TBlockHistogram(HT_ADAPTIVE_WEIGHT_HISTOGRAM, CalcWeightQuality, intervals, id, shrinkSize)
  391. {
  392. }
  393. void TBlockWeightHistogram::FastGreedyShrink(size_t intervals) {
  394. if (Bins.size() <= intervals)
  395. return;
  396. double slab = Sum / intervals;
  397. size_t i = 0;
  398. size_t pos = 0;
  399. while (i < Bins.size()) {
  400. double curW = Bins[i].second;
  401. double curMul = Bins[i].first * Bins[i].second;
  402. ++i;
  403. while (i < Bins.size() && curW + Bins[i].second <= slab && pos + Bins.size() - i >= intervals) {
  404. curW += Bins[i].second;
  405. curMul += Bins[i].first * Bins[i].second;
  406. ++i;
  407. }
  408. Bins[pos++] = TWeightedValue(curMul / curW, curW);
  409. }
  410. Bins.resize(pos);
  411. PrevSize = pos;
  412. }
  413. ///////////////////////
  414. // TBlockWardHistogram
  415. ///////////////////////
  416. TBlockWardHistogram::TBlockWardHistogram(size_t intervals, ui64 id, size_t shrinkSize)
  417. : TBlockHistogram(HT_ADAPTIVE_WARD_HISTOGRAM, CalcWardQuality, intervals, id, shrinkSize)
  418. {
  419. }
  420. bool TBlockWardHistogram::CalcSplitInfo(
  421. const TCumulatives::const_iterator beg,
  422. const TCumulatives::const_iterator end, // (!) points to the final element
  423. TSplitInfo& splitInfo // out
  424. ) {
  425. if (end - beg < 2) {
  426. return false;
  427. }
  428. TCumulatives::const_iterator mid = LowerBound(beg, end + 1, TCumulative{(beg->first + end->first) / 2, 0.});
  429. if (mid == beg) {
  430. mid++;
  431. } else if (mid == end) {
  432. mid--;
  433. }
  434. // derived from Ward's minimum variance criterion
  435. double profit = 0.0;
  436. profit += (mid->second - beg->second) * (mid->second - beg->second) / (mid->first - beg->first);
  437. profit += (end->second - mid->second) * (end->second - mid->second) / (end->first - mid->first);
  438. profit -= (end->second - beg->second) * (end->second - beg->second) / (end->first - beg->first);
  439. splitInfo = {profit, beg, mid, end};
  440. return true;
  441. }
  442. void TBlockWardHistogram::FastGreedyShrink(size_t intervals) {
  443. Y_VERIFY(intervals > 0);
  444. if (Bins.size() <= intervals) {
  445. return;
  446. }
  447. // fill cumulative sums
  448. // sum at index i equals to the sum of all values before i
  449. // sum at index i+1 equals to the sum of all values before i with the value at i added
  450. TCumulatives cumulatives;
  451. cumulatives.reserve(Bins.size() + 1);
  452. TCumulative cumulative = {0., 0.};
  453. cumulatives.push_back(cumulative);
  454. for (size_t i = 0; i < Bins.size(); i++) {
  455. cumulative.first += Bins[i].second;
  456. cumulative.second += Bins[i].second * Bins[i].first;
  457. cumulatives.push_back(cumulative);
  458. }
  459. TVector<TCumulatives::const_iterator> splits;
  460. splits.reserve(intervals + 1);
  461. splits.push_back(cumulatives.begin());
  462. splits.push_back(cumulatives.end() - 1);
  463. TPriorityQueue<TSplitInfo> candidates;
  464. // explicitly add first split
  465. TSplitInfo newSplitInfo;
  466. if (CalcSplitInfo(cumulatives.begin(), cumulatives.end() - 1, newSplitInfo)) {
  467. candidates.push(newSplitInfo);
  468. }
  469. // recursively split until done
  470. for (size_t split = 0; split < intervals - 1 && !candidates.empty(); split++) {
  471. TSplitInfo curSplitInfo = candidates.top();
  472. candidates.pop();
  473. splits.push_back(curSplitInfo.mid);
  474. if (CalcSplitInfo(curSplitInfo.beg, curSplitInfo.mid, newSplitInfo)) {
  475. candidates.push(newSplitInfo);
  476. }
  477. if (CalcSplitInfo(curSplitInfo.mid, curSplitInfo.end, newSplitInfo)) {
  478. candidates.push(newSplitInfo);
  479. }
  480. }
  481. // calclate new bin centers and weights
  482. Sort(splits.begin(), splits.end());
  483. Bins.clear();
  484. for (auto it = splits.begin(); it + 1 != splits.end(); ++it) {
  485. auto splitBeg = *it;
  486. auto splitEnd = *(it + 1);
  487. double cnt = (splitEnd->first - splitBeg->first);
  488. double mu = (splitEnd->second - splitBeg->second) / cnt;
  489. Bins.push_back(TWeightedValue(mu, cnt));
  490. }
  491. }
  492. }