mkql_computation_node.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. #include "mkql_computation_node_holders.h"
  2. #include "mkql_computation_node_impl.h"
  3. #include "mkql_computation_node_pack.h"
  4. #include "mkql_value_builder.h"
  5. #include "mkql_validate.h"
  6. #include <yql/essentials/minikql/mkql_node_builder.h>
  7. #include <yql/essentials/minikql/mkql_node_cast.h>
  8. #include <yql/essentials/minikql/mkql_node_printer.h>
  9. #include <yql/essentials/minikql/mkql_function_registry.h>
  10. #include <yql/essentials/minikql/mkql_type_builder.h>
  11. #include <yql/essentials/minikql/mkql_utils.h>
  12. #include <yql/essentials/minikql/mkql_alloc.h>
  13. #include <util/generic/set.h>
  14. #include <util/generic/algorithm.h>
  15. #include <util/random/mersenne.h>
  16. #include <util/random/random.h>
  17. #include <util/system/tempfile.h>
  18. #include <util/system/fstat.h>
  19. #include <util/system/rusage.h>
  20. #include <util/stream/file.h>
  21. #include <util/stream/output.h>
  22. #include <util/memory/pool.h>
  23. namespace NKikimr {
  24. namespace NMiniKQL {
  25. std::unique_ptr<IArrowKernelComputationNode> IComputationNode::PrepareArrowKernelComputationNode(TComputationContext& ctx) const {
  26. Y_UNUSED(ctx);
  27. return {};
  28. }
  29. TDatumProvider MakeDatumProvider(const arrow::Datum& datum) {
  30. return [datum]() {
  31. return datum;
  32. };
  33. }
  34. TDatumProvider MakeDatumProvider(const IComputationNode* node, TComputationContext& ctx) {
  35. return [node, &ctx]() {
  36. const auto& value = node->GetValue(ctx);
  37. return TArrowBlock::From(value).GetDatum();
  38. };
  39. }
  40. TComputationContext::TComputationContext(const THolderFactory& holderFactory,
  41. const NUdf::IValueBuilder* builder,
  42. const TComputationOptsFull& opts,
  43. const TComputationMutables& mutables,
  44. arrow::MemoryPool& arrowMemoryPool)
  45. : TComputationContextLLVM{holderFactory, opts.Stats, std::make_unique<NUdf::TUnboxedValue[]>(mutables.CurValueIndex), builder}
  46. , RandomProvider(opts.RandomProvider)
  47. , TimeProvider(opts.TimeProvider)
  48. , ArrowMemoryPool(arrowMemoryPool)
  49. , WideFields(mutables.CurWideFieldsIndex, nullptr)
  50. , TypeEnv(opts.TypeEnv)
  51. , Mutables(mutables)
  52. , TypeInfoHelper(new TTypeInfoHelper)
  53. , CountersProvider(opts.CountersProvider)
  54. , SecureParamsProvider(opts.SecureParamsProvider)
  55. {
  56. std::fill_n(MutableValues.get(), mutables.CurValueIndex, NUdf::TUnboxedValue(NUdf::TUnboxedValuePod::Invalid()));
  57. for (const auto& [mutableIdx, fieldIdx, used] : mutables.WideFieldInitialize) {
  58. for (ui32 i: used) {
  59. WideFields[fieldIdx + i] = &MutableValues[mutableIdx + i];
  60. }
  61. }
  62. }
  63. TComputationContext::~TComputationContext() {
  64. #ifndef NDEBUG
  65. if (RssCounter) {
  66. Cerr << "UsageOnFinish: graph=" << HolderFactory.GetPagePool().GetUsed()
  67. << ", rss=" << TRusage::Get().MaxRss
  68. << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated()
  69. << ", adjustor=" << UsageAdjustor
  70. << Endl;
  71. }
  72. #endif
  73. }
  74. void TComputationContext::UpdateUsageAdjustor(ui64 memLimit) {
  75. const auto rss = TRusage::Get().MaxRss;
  76. if (!InitRss) {
  77. LastRss = InitRss = rss;
  78. }
  79. #ifndef NDEBUG
  80. // Print first time and then each 30 seconds
  81. bool printUsage = LastPrintUsage == TInstant::Zero()
  82. || TInstant::Now() > TDuration::Seconds(30).ToDeadLine(LastPrintUsage);
  83. #endif
  84. if (auto peakAlloc = HolderFactory.GetPagePool().GetPeakAllocated()) {
  85. if (rss - InitRss > memLimit && rss - LastRss > (memLimit / 4)) {
  86. UsageAdjustor = std::max(1.f, float(rss - InitRss) / float(peakAlloc));
  87. LastRss = rss;
  88. #ifndef NDEBUG
  89. printUsage = UsageAdjustor > 1.f;
  90. #endif
  91. }
  92. }
  93. #ifndef NDEBUG
  94. if (printUsage) {
  95. Cerr << "Usage: graph=" << HolderFactory.GetPagePool().GetUsed()
  96. << ", rss=" << rss
  97. << ", peakAlloc=" << HolderFactory.GetPagePool().GetPeakAllocated()
  98. << ", adjustor=" << UsageAdjustor
  99. << Endl;
  100. LastPrintUsage = TInstant::Now();
  101. }
  102. #endif
  103. }
  104. class TSimpleSecureParamsProvider : public NUdf::ISecureParamsProvider {
  105. public:
  106. TSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams)
  107. : SecureParams(secureParams)
  108. {}
  109. bool GetSecureParam(NUdf::TStringRef key, NUdf::TStringRef& value) const override {
  110. auto found = SecureParams.FindPtr(TStringBuf(key));
  111. if (!found) {
  112. return false;
  113. }
  114. value = (TStringBuf)*found;
  115. return true;
  116. }
  117. private:
  118. const THashMap<TString, TString> SecureParams;
  119. };
  120. std::unique_ptr<NUdf::ISecureParamsProvider> MakeSimpleSecureParamsProvider(const THashMap<TString, TString>& secureParams) {
  121. return std::make_unique<TSimpleSecureParamsProvider>(secureParams);
  122. }
  123. }
  124. }