mkql_vector_spiller_adapter.h 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #pragma once
  2. #include <queue>
  3. #include <yql/essentials/minikql/defs.h>
  4. #include <yql/essentials/minikql/computation/mkql_spiller.h>
  5. #include <yql/essentials/minikql/mkql_alloc.h>
  6. namespace NKikimr::NMiniKQL {
  7. ///Stores long vectors of any type (excepting pointers). Tested on int and chars.
  8. ///The adapter places moved vectors into buffer and flushes buffer on disk when sizeLimit is reached.
  9. ///Returns vectors in FIFO order.
  10. template<class T, class Alloc>
  11. class TVectorSpillerAdapter {
  12. public:
  13. enum class EState {
  14. AcceptingData,
  15. SpillingData,
  16. AcceptingDataRequests,
  17. RestoringData,
  18. DataReady
  19. };
  20. TVectorSpillerAdapter(ISpiller::TPtr spiller, size_t sizeLimit)
  21. : Spiller(spiller)
  22. , SizeLimit(sizeLimit)
  23. {
  24. }
  25. ///Returns current stete of the adapter
  26. EState GetState() const {
  27. return State;
  28. }
  29. ///Is adapter ready to spill next vector via AddData method.
  30. ///Returns false in case when there are async operations in progress.
  31. bool IsAcceptingData() {
  32. return State == EState::AcceptingData;
  33. }
  34. ///When data is ready ExtractVector() is expected to be called.
  35. bool IsDataReady() {
  36. return State == EState::DataReady;
  37. }
  38. ///Adapter is ready to accept requests for vectors.
  39. bool IsAcceptingDataRequests() {
  40. return State == EState::AcceptingDataRequests;
  41. }
  42. ///Adds new vector to storage. Will not launch real disk operation if case of small vectors
  43. ///(if inner buffer is not full).
  44. void AddData(std::vector<T, Alloc>&& vec) {
  45. MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
  46. MKQL_ENSURE(State == EState::AcceptingData, "Internal logic error");
  47. StoredChunksElementsCount.push(vec.size());
  48. CurrentVector = std::move(vec);
  49. NextVectorPositionToSave = 0;
  50. SaveNextPartOfVector();
  51. }
  52. ///Should be used to update async operatrions statuses.
  53. ///For SpillingData state it will try to spill more content of inner buffer.
  54. ///ForRestoringData state it will try to load more content of requested vector.
  55. void Update() {
  56. switch (State) {
  57. case EState::SpillingData:
  58. MKQL_ENSURE(WriteOperation.has_value(), "Internal logic error");
  59. if (!WriteOperation->HasValue()) return;
  60. StoredChunks.push(WriteOperation->ExtractValue());
  61. WriteOperation = std::nullopt;
  62. if (IsFinalizing) {
  63. State = EState::AcceptingDataRequests;
  64. return;
  65. }
  66. if (CurrentVector.empty()) {
  67. State = EState::AcceptingData;
  68. return;
  69. }
  70. SaveNextPartOfVector();
  71. return;
  72. case EState::RestoringData:
  73. MKQL_ENSURE(ReadOperation.has_value(), "Internal logic error");
  74. if (!ReadOperation->HasValue()) return;
  75. Buffer = std::move(ReadOperation->ExtractValue().value());
  76. ReadOperation = std::nullopt;
  77. StoredChunks.pop();
  78. LoadNextVector();
  79. return;
  80. default:
  81. return;
  82. }
  83. }
  84. ///Get requested vector.
  85. std::vector<T, Alloc>&& ExtractVector() {
  86. StoredChunksElementsCount.pop();
  87. State = EState::AcceptingDataRequests;
  88. return std::move(CurrentVector);
  89. }
  90. ///Start restoring next vector. If th eentire contents of the vector are in memory
  91. ///State will be changed to DataREady without any async read operation. ExtractVector is expected
  92. ///to be called immediately.
  93. void RequestNextVector() {
  94. MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
  95. MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
  96. MKQL_ENSURE(!StoredChunksElementsCount.empty(), "Internal logic error");
  97. CurrentVector.reserve(StoredChunksElementsCount.front());
  98. State = EState::RestoringData;
  99. LoadNextVector();
  100. }
  101. ///Finalize will spill all the contents of inner buffer if any.
  102. ///Is case if buffer is not ready async write operation will be started.
  103. void Finalize() {
  104. MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
  105. if (Buffer.Empty()) {
  106. State = EState::AcceptingDataRequests;
  107. return;
  108. }
  109. SaveBuffer();
  110. IsFinalizing = true;
  111. }
  112. private:
  113. class TVectorStream : public IOutputStream {
  114. public:
  115. explicit TVectorStream(std::vector<T, Alloc>& vec)
  116. : Dst_(vec)
  117. {
  118. }
  119. private:
  120. virtual void DoWrite(const void* buf, size_t len) override {
  121. MKQL_ENSURE(len % sizeof(T) == 0, "size should always by multiple of sizeof(T)");
  122. const T* data = reinterpret_cast<const T*>(buf);
  123. Dst_.insert(Dst_.end(), data, data + len / sizeof(T));
  124. }
  125. std::vector<T, Alloc>& Dst_;
  126. };
  127. void CopyRopeToTheEndOfVector(std::vector<T, Alloc>& vec, const NYql::TChunkedBuffer& rope, size_t toCopy = std::numeric_limits<size_t>::max()) {
  128. TVectorStream out(vec);
  129. rope.CopyTo(out, toCopy);
  130. }
  131. void LoadNextVector() {
  132. auto requestedVectorSize= StoredChunksElementsCount.front();
  133. MKQL_ENSURE(requestedVectorSize >= CurrentVector.size(), "Internal logic error");
  134. size_t sizeToLoad = (requestedVectorSize - CurrentVector.size()) * sizeof(T);
  135. if (Buffer.Size() >= sizeToLoad) {
  136. // if all the data for requested vector is ready
  137. CopyRopeToTheEndOfVector(CurrentVector, Buffer, sizeToLoad);
  138. Buffer.Erase(sizeToLoad);
  139. State = EState::DataReady;
  140. } else {
  141. CopyRopeToTheEndOfVector(CurrentVector, Buffer);
  142. ReadOperation = Spiller->Extract(StoredChunks.front());
  143. }
  144. }
  145. void SaveBuffer() {
  146. State = EState::SpillingData;
  147. WriteOperation = Spiller->Put(std::move(Buffer));
  148. }
  149. void AddDataToRope(const T* data, size_t count) {
  150. auto owner = std::make_shared<std::vector<T>>(data, data + count);
  151. TStringBuf buf(reinterpret_cast<const char *>(owner->data()), count * sizeof(T));
  152. Buffer.Append(buf, owner);
  153. }
  154. void SaveNextPartOfVector() {
  155. size_t maxFittingElemets = (SizeLimit - Buffer.Size()) / sizeof(T);
  156. size_t remainingElementsInVector = CurrentVector.size() - NextVectorPositionToSave;
  157. size_t elementsToCopyFromVector = std::min(maxFittingElemets, remainingElementsInVector);
  158. AddDataToRope(CurrentVector.data() + NextVectorPositionToSave, elementsToCopyFromVector);
  159. NextVectorPositionToSave += elementsToCopyFromVector;
  160. if (NextVectorPositionToSave >= CurrentVector.size()) {
  161. CurrentVector.resize(0);
  162. NextVectorPositionToSave = 0;
  163. }
  164. if (SizeLimit - Buffer.Size() < sizeof(T)) {
  165. SaveBuffer();
  166. return;
  167. }
  168. State = EState::AcceptingData;
  169. }
  170. private:
  171. EState State = EState::AcceptingData;
  172. ISpiller::TPtr Spiller;
  173. const size_t SizeLimit;
  174. NYql::TChunkedBuffer Buffer;
  175. // Used to store vector while spilling and also used while restoring the data
  176. std::vector<T, Alloc> CurrentVector;
  177. size_t NextVectorPositionToSave = 0;
  178. std::queue<ISpiller::TKey> StoredChunks;
  179. std::queue<size_t> StoredChunksElementsCount;
  180. std::optional<NThreading::TFuture<ISpiller::TKey>> WriteOperation = std::nullopt;
  181. std::optional<NThreading::TFuture<std::optional<NYql::TChunkedBuffer>>> ReadOperation = std::nullopt;
  182. bool IsFinalizing = false;
  183. };
  184. }//namespace NKikimr::NMiniKQL