Browse Source

Properly calculate size of cached files (#2445)

Alexey Ozeritskiy 1 year ago
parent
commit
ffcca3e7f7
1 changed files with 37 additions and 11 deletions
  1. 37 11
      ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp

+ 37 - 11
ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp

@@ -91,6 +91,29 @@ void LoadRopeFromPipe(IInputStream& input, TRope& rope) {
     } while (size != 0);
 }
 
+class TFilesHolder {
+public:
+    using TPtr = std::unique_ptr<TFilesHolder>;
+
+    TFilesHolder(const IFileCache::TPtr& fileCache)
+        : FileCache(fileCache)
+    { }
+
+    ~TFilesHolder() {
+        for (const auto& file : Files) {
+            FileCache->ReleaseFile(file);
+        }
+    }
+
+    void Add(const TString& objectId) {
+        Files.emplace(objectId);
+    }
+
+private:
+    std::unordered_set<TString> Files;
+    const IFileCache::TPtr FileCache;
+};
+
 class TChildProcess: private TNonCopyable {
 public:
     TChildProcess(const TString& exeName, const TVector<TString>& args, const THashMap<TString, TString>& env, const TString& workDir)
@@ -1265,11 +1288,13 @@ class TTaskRunner: public IPipeTaskRunner {
 public:
     TTaskRunner(
         const NDqProto::TDqTask& task,
+        TFilesHolder::TPtr&& filesHolder,
         THolder<TChildProcess>&& command,
         ui64 stageId,
         const TString& traceId)
         : TraceId(traceId)
         , Task(task)
+        , FilesHolder(std::move(filesHolder))
         , Alloc(new NKikimr::NMiniKQL::TScopedAlloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true),
             [](NKikimr::NMiniKQL::TScopedAlloc* ptr) { ptr->Acquire(); delete ptr; })
         , AllocatedHolder(std::make_optional<TAllocatedHolder>(*Alloc, "TDqTaskRunnerProxy"))
@@ -1557,6 +1582,7 @@ private:
 private:
     const TString TraceId;
     NDqProto::TDqTask Task;
+    TFilesHolder::TPtr FilesHolder;
     THashMap<TString, TString> SecureParams;
     THashMap<TString, TString> TaskParams;
     TVector<TString> ReadRanges;
@@ -1600,10 +1626,11 @@ class TDqTaskRunner: public NDq::IDqTaskRunner {
 public:
     TDqTaskRunner(
         const NDqProto::TDqTask& task,
+        TFilesHolder::TPtr&& filesHolder,
         THolder<TChildProcess>&& command,
         ui64 stageId,
         const TString& traceId)
-        : Delegate(new TTaskRunner(task, std::move(command), stageId, traceId))
+        : Delegate(new TTaskRunner(task, std::move(filesHolder), std::move(command), stageId, traceId))
         , Task(task)
     { }
 
@@ -1825,7 +1852,6 @@ private:
 
 /*______________________________________________________________________________________________*/
 
-
 class TPipeFactory: public IProxyFactory {
     struct TJob: public TTaskScheduler::ITask {
         TJob(NThreading::TPromise<void> p)
@@ -1887,8 +1913,8 @@ public:
         tmp.GetMeta().UnpackTo(&taskMeta);
         ui64 stageId = taskMeta.GetStageId();
         auto result = GetExecutorForTask(taskMeta.GetFiles(), taskMeta.GetSettings());
-        auto task = PrepareTask(tmp, result.Get());
-        return new TTaskRunner(task, std::move(result), stageId, traceId);
+        auto [task, filesHolder] = PrepareTask(tmp, result.Get());
+        return new TTaskRunner(task, std::move(filesHolder), std::move(result), stageId, traceId);
     }
 
     TIntrusivePtr<NDq::IDqTaskRunner> Get(NKikimr::NMiniKQL::TScopedAlloc& alloc, const NDq::TDqTaskSettings& tmp, NDqProto::EDqStatsMode statsMode, const TString& traceId) override
@@ -1899,8 +1925,8 @@ public:
         tmp.GetMeta().UnpackTo(&taskMeta);
 
         auto result = GetExecutorForTask(taskMeta.GetFiles(), taskMeta.GetSettings());
-        auto task = PrepareTask(tmp, result.Get());
-        return new TDqTaskRunner(task, std::move(result), taskMeta.GetStageId(), traceId);
+        auto [task, filesHolder] = PrepareTask(tmp, result.Get());
+        return new TDqTaskRunner(task, std::move(filesHolder), std::move(result), taskMeta.GetStageId(), traceId);
     }
 
 private:
@@ -1935,9 +1961,10 @@ private:
         return exePath + "," + settings.ToString();
     }
 
-    NDqProto::TDqTask PrepareTask(const NDq::TDqTaskSettings& tmp, TChildProcess* result) {
+    std::tuple<NDqProto::TDqTask, TFilesHolder::TPtr> PrepareTask(const NDq::TDqTaskSettings& tmp, TChildProcess* result) {
         // get files from fileCache
         auto task = tmp.GetSerializedTask();
+        auto filesHolder = std::make_unique<TFilesHolder>(FileCache);
         Yql::DqsProto::TTaskMeta taskMeta;
 
         task.GetMeta().UnpackTo(&taskMeta);
@@ -1946,15 +1973,14 @@ private:
 
         for (auto& file : *files) {
             if (file.GetObjectType() != Yql::DqsProto::TFile::EEXE_FILE) {
-                auto maybeFile = FileCache->FindFile(file.GetObjectId());
+                auto maybeFile = FileCache->AcquireFile(file.GetObjectId());
                 if (!maybeFile) {
                     throw std::runtime_error("Cannot find object `" + file.GetObjectId() + "' in cache");
                 }
+                filesHolder->Add(file.GetObjectId());
                 auto name = file.GetName();
 
                 switch (file.GetObjectType()) {
-                    case Yql::DqsProto::TFile::EEXE_FILE:
-                        break;
                     case Yql::DqsProto::TFile::EUDF_FILE:
                     case Yql::DqsProto::TFile::EUSER_FILE:
                         file.SetLocalPath(InitializeLocalFile(result->ExternalWorkDir(), *maybeFile, name));
@@ -1970,7 +1996,7 @@ private:
         taskParams[WorkingDirectoryDontInitParamName] = "true";
         task.MutableMeta()->PackFrom(taskMeta);
 
-        return task;
+        return std::make_tuple(task, std::move(filesHolder));
     }
 
     TFsPath MakeLocalPath(TString fileName) {