mkql_spiller_adapter.h 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #pragma once
  2. #include "mkql_spiller.h"
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  4. namespace NKikimr::NMiniKQL {
  5. ///Stores and loads very long sequences of TMultiType UVs
  6. ///Can split sequences into chunks
  7. ///Sends chunks to ISplitter and keeps assigned keys
  8. ///When all data is written switches to read mode. Switching back to writing mode is not supported
  9. ///Provides an interface for sequential read (like forward iterator)
  10. ///When interaction with ISpiller is required, Write and Read operations return a Future
  11. class TWideUnboxedValuesSpillerAdapter {
  12. public:
  13. TWideUnboxedValuesSpillerAdapter(ISpiller::TPtr spiller, const TMultiType* type, size_t sizeLimit)
  14. : Spiller(spiller)
  15. , ItemType(type)
  16. , SizeLimit(sizeLimit)
  17. , Packer(type)
  18. {
  19. }
  20. /// Write wide UV item
  21. /// \returns
  22. /// - nullopt, if thee values are accumulated
  23. /// - TFeature, if the values are being stored asynchronously and a caller must wait until async operation ends
  24. /// In this case a caller must wait operation completion and call StoreCompleted.
  25. /// Design note: not using Subscribe on a Future here to avoid possible race condition
  26. std::optional<NThreading::TFuture<ISpiller::TKey>> WriteWideItem(const TArrayRef<NUdf::TUnboxedValuePod>& wideItem) {
  27. Packer.AddWideItem(wideItem.data(), wideItem.size());
  28. if (Packer.PackedSizeEstimate() > SizeLimit) {
  29. return Spiller->Put(std::move(Packer.Finish()));
  30. } else {
  31. return std::nullopt;
  32. }
  33. }
  34. std::optional<NThreading::TFuture<ISpiller::TKey>> FinishWriting() {
  35. if (Packer.IsEmpty())
  36. return std::nullopt;
  37. return Spiller->Put(std::move(Packer.Finish()));
  38. }
  39. void AsyncWriteCompleted(ISpiller::TKey key) {
  40. StoredChunks.push_back(key);
  41. }
  42. //Extracting interface
  43. bool Empty() const {
  44. return StoredChunks.empty() && !CurrentBatch;
  45. }
  46. std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ExtractWideItem(const TArrayRef<NUdf::TUnboxedValue>& wideItem) {
  47. MKQL_ENSURE(!Empty(), "Internal logic error");
  48. if (CurrentBatch) {
  49. auto row = CurrentBatch->Head();
  50. for (size_t i = 0; i != wideItem.size(); ++i) {
  51. wideItem[i] = row[i];
  52. }
  53. CurrentBatch->Pop();
  54. if (CurrentBatch->empty()) {
  55. CurrentBatch = std::nullopt;
  56. }
  57. return std::nullopt;
  58. } else {
  59. auto r = Spiller->Get(StoredChunks.front());
  60. StoredChunks.pop_front();
  61. return r;
  62. }
  63. }
  64. void AsyncReadCompleted(NYql::TChunkedBuffer&& rope,const THolderFactory& holderFactory ) {
  65. //Implementation detail: deserialization is performed in a processing thread
  66. TUnboxedValueBatch batch(ItemType);
  67. Packer.UnpackBatch(std::move(rope), holderFactory, batch);
  68. CurrentBatch = std::move(batch);
  69. }
  70. private:
  71. ISpiller::TPtr Spiller;
  72. const TMultiType* const ItemType;
  73. const size_t SizeLimit;
  74. TValuePackerTransport<false> Packer;
  75. std::deque<ISpiller::TKey> StoredChunks;
  76. std::optional<TUnboxedValueBatch> CurrentBatch;
  77. };
  78. }//namespace NKikimr::NMiniKQL