mkql_queue.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. #include "mkql_queue.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. #include <yql/essentials/minikql/mkql_program_builder.h>
  5. #include <yql/essentials/public/udf/udf_string.h>
  6. namespace NKikimr {
  7. using namespace NUdf;
  8. namespace NMiniKQL {
  9. namespace {
  10. class TQueueResource : public TComputationValue<TQueueResource> {
  11. public:
  12. TQueueResource(TMemoryUsageInfo* memInfo, const TStringBuf& tag, TMaybe<ui64> capacity, ui64 initSize)
  13. : TComputationValue(memInfo)
  14. , ResourceTag(tag)
  15. , Buffer(capacity, TUnboxedValue(), initSize)
  16. , BufferBytes(CurrentMemUsage())
  17. {
  18. MKQL_MEM_TAKE(memInfo, &Buffer, BufferBytes);
  19. }
  20. ~TQueueResource() {
  21. Y_DEBUG_ABORT_UNLESS(BufferBytes == CurrentMemUsage());
  22. MKQL_MEM_RETURN(GetMemInfo(), &Buffer, CurrentMemUsage());
  23. Buffer.Clear();
  24. }
  25. void UpdateBufferStats() {
  26. MKQL_MEM_RETURN(GetMemInfo(), &Buffer, BufferBytes);
  27. BufferBytes = CurrentMemUsage();
  28. MKQL_MEM_TAKE(GetMemInfo(), &Buffer, BufferBytes);
  29. }
  30. TSafeCircularBuffer<TUnboxedValue>& GetBuffer() {
  31. return Buffer;
  32. }
  33. private:
  34. NUdf::TStringRef GetResourceTag() const override {
  35. return NUdf::TStringRef(ResourceTag);
  36. }
  37. void* GetResource() override {
  38. return this;
  39. }
  40. size_t CurrentMemUsage() const {
  41. return Buffer.Size() * sizeof(TUnboxedValue);
  42. }
  43. const TStringBuf ResourceTag;
  44. TSafeCircularBuffer<TUnboxedValue> Buffer;
  45. size_t BufferBytes;
  46. };
  47. class TQueueResource;
  48. class TQueueResourceUser {
  49. public:
  50. TQueueResourceUser(TStringBuf&& tag, IComputationNode* resource);
  51. TSafeCircularBuffer<NUdf::TUnboxedValue>& CheckAndGetBuffer(const NUdf::TUnboxedValuePod& resource) const;
  52. void UpdateBufferStats(const NUdf::TUnboxedValuePod& resource) const;
  53. protected:
  54. const TStringBuf Tag;
  55. IComputationNode* const Resource;
  56. TQueueResource& GetResource(const NUdf::TUnboxedValuePod& resource) const;
  57. };
  58. TQueueResourceUser::TQueueResourceUser(TStringBuf&& tag, IComputationNode* resource)
  59. : Tag(tag)
  60. , Resource(resource)
  61. {}
  62. TSafeCircularBuffer<TUnboxedValue>& TQueueResourceUser::CheckAndGetBuffer(const TUnboxedValuePod& resource) const {
  63. return GetResource(resource).GetBuffer();
  64. }
  65. void TQueueResourceUser::UpdateBufferStats(const TUnboxedValuePod& resource) const {
  66. GetResource(resource).UpdateBufferStats();
  67. }
  68. TQueueResource& TQueueResourceUser::GetResource(const TUnboxedValuePod& resource) const {
  69. const TStringBuf tag = resource.GetResourceTag();
  70. Y_DEBUG_ABORT_UNLESS(tag == Tag, "Expected correct Queue resource");
  71. return *static_cast<TQueueResource*>(resource.GetResource());
  72. }
  73. class TQueueCreateWrapper : public TMutableComputationNode<TQueueCreateWrapper> {
  74. typedef TMutableComputationNode<TQueueCreateWrapper> TBaseComputation;
  75. public:
  76. TQueueCreateWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TString& name, TMaybe<ui64> capacity, ui64 initSize)
  77. : TBaseComputation(mutables)
  78. , DependentNodes(std::move(dependentNodes))
  79. , Name(name)
  80. , Capacity(capacity)
  81. , InitSize(initSize)
  82. {}
  83. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  84. return NUdf::TUnboxedValuePod(new TQueueResource(&ctx.HolderFactory.GetMemInfo(), Name, Capacity, InitSize));
  85. }
  86. private:
  87. void RegisterDependencies() const final {
  88. std::for_each(DependentNodes.cbegin(), DependentNodes.cend(), std::bind(&TQueueCreateWrapper::DependsOn, this, std::placeholders::_1));
  89. }
  90. const TComputationNodePtrVector DependentNodes;
  91. const TString Name;
  92. const TMaybe<ui64> Capacity;
  93. const ui64 InitSize;
  94. };
  95. class TQueuePushWrapper : public TMutableComputationNode<TQueuePushWrapper>, public TQueueResourceUser {
  96. typedef TMutableComputationNode<TQueuePushWrapper> TBaseComputation;
  97. public:
  98. TQueuePushWrapper(TComputationMutables& mutables, const TResourceType* resourceType, IComputationNode* resource, IComputationNode* value)
  99. : TBaseComputation(mutables)
  100. , TQueueResourceUser(resourceType->GetTag(), resource)
  101. , Value(value)
  102. {}
  103. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  104. auto resource = Resource->GetValue(ctx);
  105. auto& buffer = CheckAndGetBuffer(resource);
  106. buffer.PushBack(Value->GetValue(ctx));
  107. if (buffer.IsUnbounded()) {
  108. UpdateBufferStats(resource);
  109. }
  110. return resource.Release();
  111. }
  112. private:
  113. void RegisterDependencies() const final {
  114. DependsOn(Resource);
  115. DependsOn(Value);
  116. }
  117. IComputationNode* const Value;
  118. };
  119. class TQueuePopWrapper : public TMutableComputationNode<TQueuePopWrapper>, public TQueueResourceUser {
  120. typedef TMutableComputationNode<TQueuePopWrapper> TBaseComputation;
  121. public:
  122. TQueuePopWrapper(TComputationMutables& mutables, const TResourceType* resourceType, IComputationNode* resource)
  123. : TBaseComputation(mutables)
  124. , TQueueResourceUser(resourceType->GetTag(), resource)
  125. {}
  126. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  127. auto resource = Resource->GetValue(ctx);
  128. CheckAndGetBuffer(resource).PopFront();
  129. return resource.Release();
  130. }
  131. private:
  132. void RegisterDependencies() const final {
  133. DependsOn(Resource);
  134. }
  135. };
  136. class TQueuePeekWrapper : public TMutableComputationNode<TQueuePeekWrapper>, public TQueueResourceUser {
  137. typedef TMutableComputationNode<TQueuePeekWrapper> TBaseComputation;
  138. public:
  139. TQueuePeekWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TResourceType* resourceType, IComputationNode* resource, IComputationNode* index)
  140. : TBaseComputation(mutables)
  141. , TQueueResourceUser(resourceType->GetTag(), resource)
  142. , Index(index), DependentNodes(std::move(dependentNodes))
  143. {}
  144. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  145. auto resource = Resource->GetValue(ctx);
  146. auto index = Index->GetValue(ctx);
  147. const auto& valRef = CheckAndGetBuffer(resource).Get(index.Get<ui64>());
  148. return !valRef ? NUdf::TUnboxedValuePod() : valRef.MakeOptional();
  149. }
  150. private:
  151. void RegisterDependencies() const final {
  152. DependsOn(Resource);
  153. DependsOn(Index);
  154. std::for_each(DependentNodes.cbegin(), DependentNodes.cend(), std::bind(&TQueuePeekWrapper::DependsOn, this, std::placeholders::_1));
  155. }
  156. IComputationNode* const Index;
  157. const TComputationNodePtrVector DependentNodes;
  158. };
  159. class TQueueRangeWrapper : public TMutableComputationNode<TQueueRangeWrapper>, public TQueueResourceUser {
  160. typedef TMutableComputationNode<TQueueRangeWrapper> TBaseComputation;
  161. public:
  162. class TValue : public TComputationValue<TValue>, public TQueueResourceUser {
  163. public:
  164. class TIterator : public TComputationValue<TIterator>, public TQueueResourceUser {
  165. public:
  166. TIterator(TMemoryUsageInfo* memInfo, TUnboxedValue queue, size_t begin, size_t end, ui64 generation, TStringBuf tag, IComputationNode* resource)
  167. : TComputationValue<TIterator>(memInfo)
  168. , TQueueResourceUser(std::move(tag), resource)
  169. , Queue(queue)
  170. , Buffer(CheckAndGetBuffer(queue))
  171. , Current(begin)
  172. , End(end)
  173. , Generation(generation)
  174. {
  175. }
  176. private:
  177. bool Next(NUdf::TUnboxedValue& value) override {
  178. MKQL_ENSURE(Generation == Buffer.Generation(),
  179. "Queue generation changed while doing QueueRange: expected " << Generation << ", got: " << Buffer.Generation());
  180. if (Current >= End) {
  181. return false;
  182. }
  183. const auto& valRef = Buffer.Get(Current++);
  184. value = !valRef ? NUdf::TUnboxedValuePod() : valRef.MakeOptional();
  185. return true;
  186. }
  187. bool Skip() override {
  188. if (Current >= End) {
  189. return false;
  190. }
  191. Current++;
  192. return true;
  193. }
  194. const TUnboxedValue Queue;
  195. const TSafeCircularBuffer<TUnboxedValue>& Buffer;
  196. size_t Current;
  197. const size_t End;
  198. const ui64 Generation;
  199. };
  200. TValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, TUnboxedValue queue, size_t begin, size_t end, TStringBuf tag, IComputationNode* resource)
  201. : TComputationValue<TValue>(memInfo)
  202. , TQueueResourceUser(std::move(tag), resource)
  203. , CompCtx(compCtx)
  204. , Queue(queue)
  205. , Begin(begin)
  206. , End(std::min(end, CheckAndGetBuffer(Queue).UsedSize()))
  207. , Generation(CheckAndGetBuffer(Queue).Generation())
  208. {
  209. }
  210. private:
  211. ui64 GetListLength() const final {
  212. return Begin < End ? (End - Begin) : 0;
  213. }
  214. bool HasListItems() const final {
  215. return GetListLength() != 0;
  216. }
  217. bool HasFastListLength() const final {
  218. return true;
  219. }
  220. NUdf::TUnboxedValue GetListIterator() const final {
  221. return CompCtx.HolderFactory.Create<TIterator>(Queue, Begin, End, Generation, Tag, Resource);
  222. }
  223. TComputationContext& CompCtx;
  224. const TUnboxedValue Queue;
  225. const size_t Begin;
  226. const size_t End;
  227. const ui64 Generation;
  228. };
  229. TQueueRangeWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TResourceType* resourceType, IComputationNode* resource,
  230. IComputationNode* begin, IComputationNode* end)
  231. : TBaseComputation(mutables)
  232. , TQueueResourceUser(resourceType->GetTag(), resource)
  233. , Begin(begin)
  234. , End(end)
  235. , DependentNodes(std::move(dependentNodes))
  236. {}
  237. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  238. auto queue = Resource->GetValue(ctx);
  239. auto begin = Begin->GetValue(ctx).Get<ui64>();
  240. auto end = End->GetValue(ctx).Get<ui64>();
  241. return ctx.HolderFactory.Create<TValue>(ctx, queue, begin, end, Tag, Resource);
  242. }
  243. private:
  244. void RegisterDependencies() const final {
  245. DependsOn(Resource);
  246. DependsOn(Begin);
  247. DependsOn(End);
  248. std::for_each(DependentNodes.cbegin(), DependentNodes.cend(), std::bind(&TQueueRangeWrapper::DependsOn, this, std::placeholders::_1));
  249. }
  250. IComputationNode* const Begin;
  251. IComputationNode* const End;
  252. const TComputationNodePtrVector DependentNodes;
  253. };
  254. class TPreserveStreamValue : public TComputationValue<TPreserveStreamValue>, public TQueueResourceUser {
  255. public:
  256. using TBase = TComputationValue<TPreserveStreamValue>;
  257. TPreserveStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream
  258. , NUdf::TUnboxedValue&& queue, TStringBuf tag, IComputationNode* resource, ui64 outpace)
  259. : TBase(memInfo)
  260. , TQueueResourceUser(std::move(tag), resource)
  261. , Stream(std::move(stream))
  262. , Queue(std::move(queue))
  263. , OutpaceGoal(outpace)
  264. , Buffer(CheckAndGetBuffer(Queue))
  265. , FrontIndex(Buffer.UsedSize())
  266. {}
  267. private:
  268. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) override {
  269. switch (State) {
  270. case EPreserveState::Done:
  271. return NUdf::EFetchStatus::Finish;
  272. case EPreserveState::Feed:
  273. case EPreserveState::Yield:
  274. break;
  275. default:
  276. Y_ABORT_UNLESS(Outpace > 0);
  277. Buffer.PopFront();
  278. --Outpace;
  279. }
  280. for (NUdf::TUnboxedValue item; State != EPreserveState::Emit && Outpace <= OutpaceGoal;) {
  281. switch (Stream.Fetch(item)) {
  282. case NUdf::EFetchStatus::Yield:
  283. State = EPreserveState::Yield;
  284. return NUdf::EFetchStatus::Yield;
  285. case NUdf::EFetchStatus::Finish:
  286. State = EPreserveState::Emit;
  287. break;
  288. case NUdf::EFetchStatus::Ok:
  289. Buffer.PushBack(std::move(item));
  290. if (Buffer.IsUnbounded()) {
  291. UpdateBufferStats(Queue);
  292. }
  293. ++Outpace;
  294. if (Outpace > OutpaceGoal) {
  295. State = EPreserveState::GoOn;
  296. } else {
  297. State = EPreserveState::Feed;
  298. }
  299. }
  300. }
  301. if (!Outpace) {
  302. Buffer.Clean();
  303. State = EPreserveState::Done;
  304. return NUdf::EFetchStatus::Finish;
  305. }
  306. value = Buffer.Get(FrontIndex);
  307. return NUdf::EFetchStatus::Ok;
  308. }
  309. enum class EPreserveState {
  310. Feed,
  311. GoOn,
  312. Yield,
  313. Emit,
  314. Done
  315. };
  316. const NUdf::TUnboxedValue Stream;
  317. const NUdf::TUnboxedValue Queue;
  318. const ui64 OutpaceGoal;
  319. TSafeCircularBuffer<TUnboxedValue>& Buffer;
  320. const size_t FrontIndex;
  321. EPreserveState State = EPreserveState::Feed;
  322. ui64 Outpace = 0;
  323. };
  324. class TPreserveStreamWrapper : public TMutableComputationNode<TPreserveStreamWrapper>, public TQueueResourceUser {
  325. typedef TMutableComputationNode<TPreserveStreamWrapper> TBaseComputation;
  326. public:
  327. TPreserveStreamWrapper(TComputationMutables& mutables, IComputationNode* stream, const TResourceType* resourceType, IComputationNode* resource, ui64 outpace)
  328. : TBaseComputation(mutables)
  329. , TQueueResourceUser(resourceType->GetTag(), resource)
  330. , Stream(stream)
  331. , Outpace(outpace)
  332. {}
  333. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  334. return ctx.HolderFactory.Create<TPreserveStreamValue>(Stream->GetValue(ctx), Resource->GetValue(ctx), Tag, Resource, Outpace);
  335. }
  336. private:
  337. void RegisterDependencies() const final {
  338. DependsOn(Resource);
  339. DependsOn(Stream);
  340. }
  341. IComputationNode* const Stream;
  342. const ui64 Outpace;
  343. };
  344. template<class T, class...Args>
  345. IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFactoryContext& ctx, unsigned reqArgs, Args...args) {
  346. TComputationNodePtrVector dependentNodes(callable.GetInputsCount() - reqArgs);
  347. for (ui32 i = reqArgs; i < callable.GetInputsCount(); ++i) {
  348. dependentNodes[i - reqArgs] = LocateNode(ctx.NodeLocator, callable, i);
  349. }
  350. return new T(ctx.Mutables, std::move(dependentNodes), std::forward<Args>(args)...);
  351. }
  352. }
  353. IComputationNode* WrapQueueCreate(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  354. const unsigned reqArgs = 3;
  355. MKQL_ENSURE(callable.GetInputsCount() >= reqArgs, "QueueCreate: Expected at least " << reqArgs << " arg");
  356. auto queueNameValue = AS_VALUE(TDataLiteral, callable.GetInput(0));
  357. TMaybe<ui64> capacity;
  358. if (!callable.GetInput(1).GetStaticType()->IsVoid()) {
  359. auto queueCapacityValue = AS_VALUE(TDataLiteral, callable.GetInput(1));
  360. capacity = queueCapacityValue->AsValue().Get<ui64>();
  361. }
  362. auto queueInitSizeValue = AS_VALUE(TDataLiteral, callable.GetInput(2));
  363. const TString name(queueNameValue->AsValue().AsStringRef());
  364. const auto initSize = queueInitSizeValue->AsValue().Get<ui64>();
  365. return MakeNodeWithDeps<TQueueCreateWrapper>(callable, ctx, reqArgs, name, capacity, initSize);
  366. }
  367. IComputationNode* WrapQueuePush(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  368. MKQL_ENSURE(callable.GetInputsCount() == 2, "QueuePush: Expected 2 arg");
  369. auto resourceType = AS_TYPE(TResourceType, callable.GetInput(0));
  370. auto resource = LocateNode(ctx.NodeLocator, callable, 0);
  371. auto value = LocateNode(ctx.NodeLocator, callable, 1);
  372. return new TQueuePushWrapper(ctx.Mutables, resourceType, resource, value);
  373. }
  374. IComputationNode* WrapQueuePop(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  375. MKQL_ENSURE(callable.GetInputsCount() == 1, "QueuePop: Expected 1 arg");
  376. auto resourceType = AS_TYPE(TResourceType, callable.GetInput(0));
  377. auto resource = LocateNode(ctx.NodeLocator, callable, 0);
  378. return new TQueuePopWrapper(ctx.Mutables, resourceType, resource);
  379. }
  380. IComputationNode* WrapQueuePeek(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  381. const unsigned reqArgs = 2;
  382. MKQL_ENSURE(callable.GetInputsCount() >= reqArgs, "QueuePeek: Expected at least " << reqArgs << " arg");
  383. auto resourceType = AS_TYPE(TResourceType, callable.GetInput(0));
  384. TDataType* indexType = AS_TYPE(TDataType, callable.GetInput(1));
  385. MKQL_ENSURE(indexType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64 as queue index");
  386. auto resource = LocateNode(ctx.NodeLocator, callable, 0);
  387. auto index = LocateNode(ctx.NodeLocator, callable, 1);
  388. return MakeNodeWithDeps<TQueuePeekWrapper>(callable, ctx, reqArgs, resourceType, resource, index);
  389. }
  390. IComputationNode* WrapQueueRange(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  391. const unsigned reqArgs = 3;
  392. MKQL_ENSURE(callable.GetInputsCount() >= reqArgs, "QueueRange: Expected at least " << reqArgs << " arg");
  393. auto resourceType = AS_TYPE(TResourceType, callable.GetInput(0));
  394. TDataType* beginIndexType = AS_TYPE(TDataType, callable.GetInput(1));
  395. MKQL_ENSURE(beginIndexType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64 as queue begin index");
  396. TDataType* endIndexType = AS_TYPE(TDataType, callable.GetInput(2));
  397. MKQL_ENSURE(endIndexType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64 as queue end index");
  398. auto resource = LocateNode(ctx.NodeLocator, callable, 0);
  399. auto beginIndex = LocateNode(ctx.NodeLocator, callable, 1);
  400. auto endIndex = LocateNode(ctx.NodeLocator, callable, 2);
  401. return MakeNodeWithDeps<TQueueRangeWrapper>(callable, ctx, reqArgs, resourceType, resource, beginIndex, endIndex);
  402. }
  403. IComputationNode* WrapPreserveStream(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  404. Y_UNUSED(ctx);
  405. MKQL_ENSURE(callable.GetInputsCount() == 3, "PreserveStream: Expected 3 arg");
  406. auto stream = LocateNode(ctx.NodeLocator, callable, 0);
  407. auto resource = LocateNode(ctx.NodeLocator, callable, 1);
  408. auto resourceType = AS_TYPE(TResourceType, callable.GetInput(1));
  409. auto outpaceValue = AS_VALUE(TDataLiteral, callable.GetInput(2));
  410. const auto outpace = outpaceValue->AsValue().Get<ui64>();
  411. return new TPreserveStreamWrapper(ctx.Mutables, stream, resourceType, resource, outpace);
  412. }
  413. }
  414. }