#include "mkql_computation_node_holders.h" #include "mkql_computation_node_impl.h" #include "mkql_computation_node_pack.h" #include "mkql_value_builder.h" #include "mkql_validate.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { std::unique_ptr IComputationNode::PrepareArrowKernelComputationNode(TComputationContext& ctx) const { Y_UNUSED(ctx); return {}; } TDatumProvider MakeDatumProvider(const arrow::Datum& datum) { return [datum]() { return datum; }; } TDatumProvider MakeDatumProvider(const IComputationNode* node, TComputationContext& ctx) { return [node, &ctx]() { const auto& value = node->GetValue(ctx); return TArrowBlock::From(value).GetDatum(); }; } TComputationContext::TComputationContext(const THolderFactory& holderFactory, const NUdf::IValueBuilder* builder, const TComputationOptsFull& opts, const TComputationMutables& mutables, arrow::MemoryPool& arrowMemoryPool) : TComputationContextLLVM{holderFactory, opts.Stats, std::make_unique(mutables.CurValueIndex), builder} , RandomProvider(opts.RandomProvider) , TimeProvider(opts.TimeProvider) , ArrowMemoryPool(arrowMemoryPool) , WideFields(mutables.CurWideFieldsIndex, nullptr) , TypeEnv(opts.TypeEnv) , Mutables(mutables) , TypeInfoHelper(new TTypeInfoHelper) , CountersProvider(opts.CountersProvider) , SecureParamsProvider(opts.SecureParamsProvider) { std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid())); for (const auto& [mutableIdx, fieldIdx, used] : mutables.WideFieldInitialize) { for (ui32 i: used) { WideFields[fieldIdx + i] = &MutableValues[mutableIdx + i]; } } } TComputationContext::~TComputationContext() { #ifndef NDEBUG if (RssCounter) { Cerr << "UsageOnFinish: graph=" << HolderFactory.GetPagePool().GetUsed() << ", rss=" << TRusage::Get().MaxRss << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated() << ", adjustor=" << UsageAdjustor << Endl; } #endif } void TComputationContext::UpdateUsageAdjustor(ui64 memLimit) { const auto rss = TRusage::Get().MaxRss; if (!InitRss) { LastRss = InitRss = rss; } #ifndef NDEBUG // Print first time and then each 30 seconds bool printUsage = LastPrintUsage == TInstant::Zero() || TInstant::Now() > TDuration::Seconds(30).ToDeadLine(LastPrintUsage); #endif if (auto peakAlloc = HolderFactory.GetPagePool().GetPeakAllocated()) { if (rss - InitRss > memLimit && rss - LastRss > (memLimit / 4)) { UsageAdjustor = std::max(1.f, float(rss - InitRss) / float(peakAlloc)); LastRss = rss; #ifndef NDEBUG printUsage = UsageAdjustor > 1.f; #endif } } #ifndef NDEBUG if (printUsage) { Cerr << "Usage: graph=" << HolderFactory.GetPagePool().GetUsed() << ", rss=" << rss << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated() << ", adjustor=" << UsageAdjustor << Endl; LastPrintUsage = TInstant::Now(); } #endif } class TSimpleSecureParamsProvider : public NUdf::ISecureParamsProvider { public: TSimpleSecureParamsProvider(const THashMap& secureParams) : SecureParams(secureParams) {} bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef& value) const override { auto found = SecureParams.FindPtr(TStringBuf(key)); if (!found) { return false; } value = (TStringBuf)*found; return true; } private: const THashMap SecureParams; }; std::unique_ptr MakeSimpleSecureParamsProvider(const THashMap& secureParams) { return std::make_unique(secureParams); } } }