123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- #include <util/datetime/cputimer.h>
- #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
- #include <yql/essentials/minikql/arrow/arrow_defs.h>
- #include <arrow/array/builder_primitive.h>
- #include <arrow/datum.h>
- #include <library/cpp/getopt/last_getopt.h>
- #include <util/digest/fnv.h>
- #include <util/digest/murmur.h>
- #include <util/digest/city.h>
- enum class EDistribution {
- Const,
- Linear,
- Random,
- Few,
- RandomFew
- };
- enum class EShape {
- Default,
- Sqrt,
- Log
- };
- arrow::Datum MakeIntColumn(ui32 len, EDistribution dist, EShape shape, ui32 buckets) {
- arrow::Int32Builder builder;
- ARROW_OK(builder.Reserve(len));
- for (ui32 i = 0; i < len; ++i) {
- ui32 val;
- switch (shape) {
- case EShape::Default:
- val = i;
- break;
- case EShape::Sqrt:
- val = (ui32)sqrt(i);
- break;
- case EShape::Log:
- val = (ui32)log(1 + i);
- break;
- }
-
- switch (dist) {
- case EDistribution::Const:
- builder.UnsafeAppend(0);
- break;
- case EDistribution::Few:
- builder.UnsafeAppend(val % buckets);
- break;
- case EDistribution::Linear:
- builder.UnsafeAppend(val);
- break;
- case EDistribution::Random:
- builder.UnsafeAppend(IntHash(val));
- break;
- case EDistribution::RandomFew:
- builder.UnsafeAppend(IntHash(val) % buckets);
- break;
- }
- }
- std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(builder.FinishInternal(&result));
- return arrow::Datum(result);
- }
- class IAggregator {
- public:
- virtual ~IAggregator() = default;
- virtual void Init(i64* state, i32 payload) = 0;
- virtual void Update(i64* state, i32 payload) = 0;
- };
- class TSumAggregator : public IAggregator {
- public:
- void Init(i64* state, i32 payload) final {
- *state = payload;
- }
- void Update(i64* state, i32 payload) final {
- *state += payload;
- }
- };
- template <typename T>
- struct TCityHasher {
- public:
- ui64 operator()(const T& x) const {
- return CityHash64(TStringBuf((char*)&x, sizeof(x)));
- }
- };
- // sum(payloads) group by keys
- template <bool CalculateHashStats, bool UseRH>
- class TAggregate {
- private:
- struct TOneCell {
- i32 Key = 0;
- bool IsEmpty = true;
- i64 State = 0;
- };
- struct TCell {
- i32 Key = 0;
- i32 PSL = -1;
- i64 State = 0;
- };
- public:
- TAggregate(const std::vector<IAggregator*>& aggs)
- : Aggs(aggs)
- , RH(sizeof(i64))
- {
- Cells.resize(1u << 8);
- }
- void AddBatch(arrow::Datum keys, arrow::Datum payloads) {
- auto arrKeys = keys.array();
- auto arrPayloads = payloads.array();
- auto len = arrKeys->length;
- const i32* ptrKeys = arrKeys->GetValues<i32>(1);
- const i32* ptrPayloads = arrPayloads->GetValues<i32>(1);
- for (int64_t i = 0; i < len; ++i) {
- auto key = ptrKeys[i];
- auto payload = ptrPayloads[i];
- if (!MoreThanOne) {
- if (One.IsEmpty) {
- One.IsEmpty = false;
- One.Key = key;
- for (const auto& a : Aggs) {
- a->Init(&One.State, payload);
- }
- Size = 1;
- continue;
- } else {
- if (key == One.Key) {
- for (const auto& a : Aggs) {
- a->Update(&One.State, payload);
- }
- continue;
- } else {
- MoreThanOne = true;
- if constexpr (UseRH) {
- bool isNew;
- auto iter = RH.Insert(One.Key, isNew);
- Y_ASSERT(isNew);
- *(i64*)RH.GetPayload(iter) = One.State;
- } else {
- bool isNew;
- ui64 bucket = AddBucketFromKeyImpl(One.Key, Cells, isNew);
- auto& c = Cells[bucket];
- c.PSL = 0;
- c.Key = One.Key;
- c.State = One.State;
- }
- }
- }
- }
- if constexpr (UseRH) {
- bool isNew = false;
- auto iter = RH.Insert(key, isNew);
- if (isNew) {
- for (const auto& a : Aggs) {
- a->Init((i64*)RH.GetPayload(iter), payload);
- }
- RH.CheckGrow();
- } else {
- for (const auto& a : Aggs) {
- a->Update((i64*)RH.GetPayload(iter), payload);
- }
- }
- } else {
- bool isNew = false;
- ui64 bucket = AddBucketFromKey(key, isNew);
- auto& c = Cells[bucket];
- if (isNew) {
- Size += 1;
- for (const auto& a : Aggs) {
- a->Init(&c.State, payload);
- }
- if (Size * 2 >= Cells.size()) {
- Grow();
- }
- } else {
- for (const auto& a : Aggs) {
- a->Update(&c.State, payload);
- }
- }
- }
- }
- }
- static ui64 MakeHash(i32 key) {
- //auto hash = FnvHash<ui64>(&key, sizeof(key));
- //auto hash = MurmurHash<ui64>(&key, sizeof(key));
- auto hash = CityHash64(TStringBuf((char*)&key, sizeof(key)));
- //auto hash = key;
- return hash;
- }
- Y_FORCE_INLINE ui64 AddBucketFromKey(i32 key, bool& isNew) {
- return AddBucketFromKeyImpl(key, Cells, isNew);
- }
- Y_FORCE_INLINE ui64 AddBucketFromKeyImpl(i32 key, std::vector<TCell>& cells, bool& isNew) {
- isNew = false;
- ui32 chainLen = 0;
- if constexpr (CalculateHashStats) {
- HashSearches++;
- }
- ui64 bucket = MakeHash(key) & (cells.size() - 1);
- i32 distance = 0;
- ui64 returnBucket;
- i64 oldState;
- for (;;) {
- if constexpr (CalculateHashStats) {
- HashProbes++;
- chainLen++;
- }
- if (cells[bucket].PSL < 0) {
- isNew = true;
- cells[bucket].Key = key;
- cells[bucket].PSL = distance;
- if constexpr (CalculateHashStats) {
- MaxHashChainLen = Max(MaxHashChainLen, chainLen);
- }
- return bucket;
- }
- if (cells[bucket].Key == key) {
- if constexpr (CalculateHashStats) {
- MaxHashChainLen = Max(MaxHashChainLen, chainLen);
- }
- return bucket;
- }
- if (distance > cells[bucket].PSL) {
- // swap keys & state
- returnBucket = bucket;
- oldState = cells[bucket].State;
- std::swap(key, cells[bucket].Key);
- std::swap(distance, cells[bucket].PSL);
- isNew = true;
- ++distance;
- bucket = (bucket + 1) & (cells.size() - 1);
- break;
- }
- ++distance;
- bucket = (bucket + 1) & (cells.size() - 1);
- }
- for (;;) {
- if constexpr (CalculateHashStats) {
- HashProbes++;
- chainLen++;
- }
- if (cells[bucket].PSL < 0) {
- if constexpr (CalculateHashStats) {
- MaxHashChainLen = Max(MaxHashChainLen, chainLen);
- }
- cells[bucket].Key = key;
- cells[bucket].State = oldState;
- cells[bucket].PSL = distance;
- return returnBucket; // for original key
- }
- Y_ENSURE(cells[bucket].Key != key);
- if (distance > cells[bucket].PSL) {
- // swap keys & state
- std::swap(key, cells[bucket].Key);
- std::swap(oldState, cells[bucket].State);
- std::swap(distance, cells[bucket].PSL);
- }
- ++distance;
- bucket = (bucket + 1) & (cells.size() - 1);
- }
- }
- void Grow() {
- std::vector<TCell> newCells;
- newCells.resize(Cells.size() * 2); // must be power of 2
- for (const auto& c : Cells) {
- if (c.PSL < 0) {
- continue;
- }
- bool isNew;
- auto newBucket = AddBucketFromKeyImpl(c.Key, newCells, isNew);
- auto& nc = newCells[newBucket];
- nc.State = c.State;
- }
- Cells.swap(newCells);
- }
- double GetAverageHashChainLen() {
- return 1.0*HashProbes/HashSearches;
- }
- ui32 GetMaxHashChainLen() {
- return MaxHashChainLen;
- }
- void GetResult(arrow::Datum& keys, arrow::Datum& sums) {
- arrow::Int32Builder keysBuilder;
- arrow::Int64Builder sumsBuilder;
- if (!MoreThanOne) {
- if (!One.IsEmpty) {
- ARROW_OK(keysBuilder.Reserve(1));
- ARROW_OK(sumsBuilder.Reserve(1));
- keysBuilder.UnsafeAppend(One.Key);
- sumsBuilder.UnsafeAppend(One.State);
- }
- } else {
- ui64 size;
- if constexpr (UseRH) {
- size = RH.GetSize();
- } else {
- size = Size;
- }
- ARROW_OK(keysBuilder.Reserve(size));
- ARROW_OK(sumsBuilder.Reserve(size));
- i32 maxPSL = 0;
- i64 sumPSL = 0;
- if constexpr (UseRH) {
- for (auto iter = RH.Begin(); iter != RH.End(); RH.Advance(iter)) {
- auto& psl = RH.GetPSL(iter);
- if (psl.Distance < 0) {
- continue;
- }
- keysBuilder.UnsafeAppend(RH.GetKey(iter));
- sumsBuilder.UnsafeAppend(*(i64*)RH.GetPayload(iter));
- maxPSL = Max(psl.Distance, maxPSL);
- sumPSL += psl.Distance;
- }
- } else {
- for (const auto& c : Cells) {
- if (c.PSL < 0) {
- continue;
- }
- keysBuilder.UnsafeAppend(c.Key);
- sumsBuilder.UnsafeAppend(c.State);
- maxPSL = Max(c.PSL, maxPSL);
- sumPSL += c.PSL;
- }
- }
- if constexpr (CalculateHashStats) {
- Cerr << "maxPSL = " << maxPSL << "\n";
- Cerr << "avgPSL = " << 1.0*sumPSL/size << "\n";
- }
- }
- std::shared_ptr<arrow::ArrayData> keysData;
- ARROW_OK(keysBuilder.FinishInternal(&keysData));
- keys = keysData;
- std::shared_ptr<arrow::ArrayData> sumsData;
- ARROW_OK(sumsBuilder.FinishInternal(&sumsData));
- sums = sumsData;
- }
- private:
- bool MoreThanOne = false;
- TOneCell One;
- std::vector<TCell> Cells;
- ui64 Size = 0;
- const std::vector<IAggregator*> Aggs;
- ui64 HashProbes = 0;
- ui64 HashSearches = 0;
- ui32 MaxHashChainLen = 0;
- NKikimr::NMiniKQL::TRobinHoodHashMap<i32> RH;
- NKikimr::NMiniKQL::TRobinHoodHashSet<i32> RHS;
- };
- int main(int argc, char** argv) {
- NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
- TString keysDistributionStr;
- TString shapeStr="default";
- ui32 nIters = 100;
- ui32 nRows = 1000000;
- ui32 nBuckets = 16;
- ui32 nRepeats = 10;
- opts.AddLongOption('k', "keys", "distribution of keys (const, linear, random, few, randomfew)").StoreResult(&keysDistributionStr).Required();
- opts.AddLongOption('s', "shape", "shape of counter (default, sqrt, log)").StoreResult(&shapeStr);
- opts.AddLongOption('i', "iter", "# of iterations").StoreResult(&nIters);
- opts.AddLongOption('r', "rows", "# of rows").StoreResult(&nRows);
- opts.AddLongOption('b', "buckets", "modulo for few/randomfew").StoreResult(&nBuckets);
- opts.AddLongOption('t', "repeats", "# of repeats").StoreResult(&nRepeats);
- opts.SetFreeArgsMax(0);
- NLastGetopt::TOptsParseResult res(&opts, argc, argv);
- EDistribution keysDist;
- EShape shape = EShape::Default;
- if (keysDistributionStr == "const") {
- keysDist = EDistribution::Const;
- } else if (keysDistributionStr == "linear") {
- keysDist = EDistribution::Linear;
- } else if (keysDistributionStr == "random") {
- keysDist = EDistribution::Random;
- } else if (keysDistributionStr == "few") {
- keysDist = EDistribution::Few;
- } else if (keysDistributionStr == "randomfew") {
- keysDist = EDistribution::RandomFew;
- } else {
- ythrow yexception() << "Unsupported distribution: " << keysDistributionStr;
- }
- if (shapeStr == "default") {
- shape = EShape::Default;
- } else if (shapeStr == "sqrt") {
- shape = EShape::Sqrt;
- } else if (shapeStr == "log") {
- shape = EShape::Log;
- } else {
- ythrow yexception() << "Unsupported shape: " << shapeStr;
- }
- auto col1 = MakeIntColumn(nRows, keysDist, shape, nBuckets);
- auto col2 = MakeIntColumn(nRows, EDistribution::Linear, EShape::Default, nBuckets);
- Cerr << "col1.length: " << col1.length() << "\n";
- Cerr << "col2.length: " << col2.length() << "\n";
- TSumAggregator sum;
- std::vector<IAggregator*> aggs;
- aggs.push_back(&sum);
- TAggregate<true, true> agg(aggs);
- agg.AddBatch(col1, col2);
- arrow::Datum keys, sums;
- agg.GetResult(keys, sums);
- ui64 total1 = 0;
- for (ui32 i = 0; i < col2.length(); ++i) {
- total1 += col2.array()->GetValues<i32>(1)[i];
- }
- Cerr << "total1: " << total1 << "\n";
- ui64 total2 = 0;
- Cerr << "keys.length: " << keys.length() << "\n";
- Cerr << "sums.length: " << sums.length() << "\n";
- for (ui32 i = 0; i < sums.length(); ++i) {
- total2 += sums.array()->GetValues<i64>(1)[i];
- }
- Cerr << "total2: " << total2 << "\n";
- Y_ENSURE(total1 == total2);
- Cerr << "AverageHashChainLen: " << agg.GetAverageHashChainLen() << "\n";
- Cerr << "MaxHashChainLen: " << agg.GetMaxHashChainLen() << "\n";
- std::vector<double> durations;
- for (ui32 j = 0; j < nRepeats; ++j) {
- TSimpleTimer timer;
- for (ui32 i = 0; i < nIters; ++i) {
- TAggregate<false, true> agg(aggs);
- agg.AddBatch(col1, col2);
- arrow::Datum keys, sums;
- agg.GetResult(keys, sums);
- }
- auto duration = timer.Get();
- durations.push_back(1e-6*duration.MicroSeconds());
- }
- double sumDurations = 0.0, sumDurationsQ = 0.0;
- for (auto d : durations) {
- sumDurations += d;
- sumDurationsQ += d * d;
- }
- double avgDuration = sumDurations / nRepeats;
- double dispDuration = sqrt(sumDurationsQ / nRepeats - avgDuration * avgDuration);
- Cerr << "Elapsed: " << avgDuration << ", noise: " << 100*dispDuration/avgDuration << "%\n";
- Cerr << "Speed: " << 1e-6 * (ui64(nIters) * nRows / avgDuration) << " M rows/sec\n";
- Cerr << "Speed: " << 1e-6 * (2 * sizeof(i32) * ui64(nIters) * nRows / avgDuration) << " M bytes/sec\n";
- return 0;
- }
|