mkql_saveload.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. #pragma once
  2. #include <yql/essentials/minikql/defs.h>
  3. #include <yql/essentials/minikql/pack_num.h>
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  5. #include <yql/essentials/minikql/mkql_string_util.h>
  6. #include <util/generic/strbuf.h>
  7. #include <util/generic/maybe.h>
  8. #include <string_view>
  9. namespace NKikimr {
  10. namespace NMiniKQL {
  11. Y_FORCE_INLINE void WriteByte(TString& out, ui8 value) {
  12. out.append((char)value);
  13. }
  14. Y_FORCE_INLINE void WriteBool(TString& out, bool value) {
  15. out.append((char)value);
  16. }
  17. Y_FORCE_INLINE void WriteUi32(TString& out, ui32 value) {
  18. char buf[MAX_PACKED32_SIZE];
  19. out.AppendNoAlias(buf, Pack32(value, buf));
  20. }
  21. Y_FORCE_INLINE void WriteUi64(TString& out, ui64 value) {
  22. char buf[MAX_PACKED64_SIZE];
  23. out.AppendNoAlias(buf, Pack64(value, buf));
  24. }
  25. Y_FORCE_INLINE bool ReadBool(TStringBuf& in) {
  26. MKQL_ENSURE(in.size(), "Serialized state is corrupted");
  27. bool result = (bool)*in.data();
  28. in.Skip(1);
  29. return result;
  30. }
  31. Y_FORCE_INLINE ui8 ReadByte(TStringBuf& in) {
  32. MKQL_ENSURE(in.size(), "Serialized state is corrupted");
  33. ui8 result = *in.data();
  34. in.Skip(1);
  35. return result;
  36. }
  37. Y_FORCE_INLINE ui32 ReadUi32(TStringBuf& in) {
  38. ui32 result;
  39. auto count = Unpack32(in.data(), in.size(), result);
  40. MKQL_ENSURE(count, "Serialized state is corrupted");
  41. in.Skip(count);
  42. return result;
  43. }
  44. Y_FORCE_INLINE ui64 ReadUi64(TStringBuf& in) {
  45. ui64 result;
  46. auto count = Unpack64(in.data(), in.size(), result);
  47. MKQL_ENSURE(count, "Serialized state is corrupted");
  48. in.Skip(count);
  49. return result;
  50. }
  51. Y_FORCE_INLINE std::string_view ReadString(TStringBuf& in) {
  52. const ui32 size = ReadUi32(in);
  53. MKQL_ENSURE(in.size() >= size, "Serialized state is corrupted");
  54. TStringBuf head = in.Head(size);
  55. in = in.Tail(size);
  56. return head;
  57. }
  58. Y_FORCE_INLINE void WriteString(TString& out, std::string_view str) {
  59. WriteUi32(out, str.size());
  60. out.AppendNoAlias(str.data(), str.size());
  61. }
  62. template<class>
  63. inline constexpr bool always_false_v = false;
  64. enum class EMkqlStateType {
  65. SIMPLE_BLOB,
  66. SNAPSHOT,
  67. INCREMENT
  68. };
  69. struct TOutputSerializer {
  70. public:
  71. static NUdf::TUnboxedValue MakeSimpleBlobState(const TString& blob, ui32 stateVersion) {
  72. TString out;
  73. WriteUi32(out, static_cast<ui32>(EMkqlStateType::SIMPLE_BLOB));
  74. WriteUi32(out, stateVersion);
  75. out.AppendNoAlias(blob.data(), blob.size());
  76. auto strRef = NUdf::TStringRef(out);
  77. return NMiniKQL::MakeString(strRef);
  78. }
  79. template<typename TContainer>
  80. static NUdf::TUnboxedValue MakeSnapshotState(TContainer& items, ui32 stateVersion) {
  81. TString out;
  82. WriteUi32(out, static_cast<ui32>(EMkqlStateType::SNAPSHOT));
  83. WriteUi32(out, stateVersion);
  84. WriteUi32(out, static_cast<ui32>(items.size()));
  85. for (const auto& [key, value] : items) {
  86. WriteString(out, key);
  87. WriteString(out, value);
  88. }
  89. auto strRef = NUdf::TStringRef(out);
  90. return NMiniKQL::MakeString(strRef);
  91. }
  92. template<typename TContainer, typename TContainer2>
  93. static NUdf::TUnboxedValue MakeIncrementState(TContainer& createdOrChanged, TContainer2& deleted, ui32 stateVersion) {
  94. TString out;
  95. WriteUi32(out, static_cast<ui32>(EMkqlStateType::INCREMENT));
  96. WriteUi32(out, stateVersion);
  97. WriteUi32(out, static_cast<ui32>(createdOrChanged.size()));
  98. WriteUi32(out, static_cast<ui32>(deleted.size()));
  99. for(const auto& [key, value] : createdOrChanged) {
  100. WriteString(out, key);
  101. WriteString(out, value);
  102. }
  103. for(const auto& key : deleted) {
  104. WriteString(out, key);
  105. }
  106. auto strRef = NUdf::TStringRef(out);
  107. return NMiniKQL::MakeString(strRef);
  108. }
  109. public:
  110. TOutputSerializer(EMkqlStateType stateType, ui32 stateVersion, TComputationContext& ctx)
  111. : Ctx(ctx) {
  112. Write(static_cast<ui32>(stateType));
  113. Write(stateVersion);
  114. }
  115. template <typename... Ts>
  116. void operator()(Ts&&... args) {
  117. (Write(std::forward<Ts>(args)), ...);
  118. }
  119. template<typename Type>
  120. void Write(const Type& value ) {
  121. if constexpr (std::is_same_v<std::remove_cv_t<Type>, TString>) {
  122. WriteString(Buf, value);
  123. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, ui64>) {
  124. WriteUi64(Buf, value);
  125. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, i64>) {
  126. WriteUi64(Buf, value);
  127. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, bool>) {
  128. WriteBool(Buf, value);
  129. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, ui8>) {
  130. WriteByte(Buf, value);
  131. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, ui32>) {
  132. WriteUi32(Buf, value);
  133. } else if constexpr (std::is_empty_v<Type>){
  134. // Empty struct is not saved/loaded.
  135. } else {
  136. static_assert(always_false_v<Type>, "Not supported type / not implemented");
  137. }
  138. }
  139. template<class Type1, class Type2>
  140. void Write(const std::pair<Type1, Type2>& value) {
  141. Write(value.first);
  142. Write(value.second);
  143. }
  144. template<class Type, class Allocator>
  145. void Write(const std::vector<Type, Allocator>& value) {
  146. Write(value.size());
  147. for (size_t i = 0; i < value.size(); ++i) {
  148. Write(value[i]);
  149. }
  150. }
  151. Y_FORCE_INLINE void WriteUnboxedValue(const TValuePacker& packer, const NUdf::TUnboxedValue& value) {
  152. auto state = packer.Pack(value);
  153. Write<ui32>(state.size());
  154. Buf.AppendNoAlias(state.data(), state.size());
  155. }
  156. static NUdf::TUnboxedValue MakeArray(TComputationContext& ctx, const TStringBuf& buf) {
  157. const size_t MaxItemLen = 1048576;
  158. size_t count = buf.size() / MaxItemLen + (buf.size() % MaxItemLen ? 1 : 0);
  159. NUdf::TUnboxedValue *items = nullptr;
  160. auto array = ctx.HolderFactory.CreateDirectArrayHolder(count, items);
  161. size_t pos = 0;
  162. for (size_t index = 0; index < count; ++index) {
  163. size_t itemSize = std::min(buf.size() - pos, MaxItemLen);
  164. NUdf::TStringValue str(itemSize);
  165. std::memcpy(str.Data(), buf.data() + pos, itemSize);
  166. items[index] = NUdf::TUnboxedValuePod(std::move(str));
  167. pos += itemSize;
  168. }
  169. return array;
  170. }
  171. NUdf::TUnboxedValue MakeState() {
  172. return MakeArray(Ctx, Buf);
  173. }
  174. protected:
  175. TString Buf;
  176. TComputationContext& Ctx;
  177. };
  178. struct TInputSerializer {
  179. public:
  180. TInputSerializer(const TStringBuf& state, TMaybe<EMkqlStateType> expectedType = Nothing())
  181. : Buf(state) {
  182. Type = static_cast<EMkqlStateType>(Read<ui32>());
  183. Read(StateVersion);
  184. if (expectedType) {
  185. MKQL_ENSURE(Type == *expectedType, "state type is not expected");
  186. }
  187. }
  188. TInputSerializer(const NUdf::TUnboxedValue& state, TMaybe<EMkqlStateType> expectedType = Nothing())
  189. : State(StateToString(state))
  190. , Buf(State) {
  191. Type = static_cast<EMkqlStateType>(Read<ui32>());
  192. Read(StateVersion);
  193. if (expectedType) {
  194. MKQL_ENSURE(Type == *expectedType, "state type is not expected");
  195. }
  196. }
  197. ui32 GetStateVersion() {
  198. return StateVersion;
  199. }
  200. EMkqlStateType GetType() {
  201. return Type;
  202. }
  203. template <typename... Ts>
  204. void operator()(Ts&... args) {
  205. (Read(args), ...);
  206. }
  207. template<typename Type, typename ReturnType = Type>
  208. ReturnType Read() {
  209. if constexpr (std::is_same_v<std::remove_cv_t<Type>, TString>) {
  210. return ReturnType(ReadString(Buf));
  211. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, ui64>) {
  212. return ReadUi64(Buf);
  213. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, i64>) {
  214. return ReadUi64(Buf);
  215. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, bool>) {
  216. return ReadBool(Buf);
  217. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, ui8>) {
  218. return ReadByte(Buf);
  219. } else if constexpr (std::is_same_v<std::remove_cv_t<Type>, ui32>) {
  220. return ReadUi32(Buf);
  221. } else if constexpr (std::is_empty_v<Type>){
  222. // Empty struct is not saved/loaded.
  223. return ReturnType{};
  224. } else {
  225. static_assert(always_false_v<Type>, "Not supported type / not implemented");
  226. }
  227. }
  228. Y_FORCE_INLINE NUdf::TUnboxedValue ReadUnboxedValue(const TValuePacker& packer, TComputationContext& ctx) {
  229. auto size = Read<ui32>();
  230. MKQL_ENSURE_S(size <= Buf.size(), "Serialized state is corrupted, size " << size << ", Buf.size " << Buf.size());
  231. auto value = packer.Unpack(TStringBuf(Buf.data(), Buf.data() + size), ctx.HolderFactory);
  232. Buf.Skip(size);
  233. return value;
  234. }
  235. template<typename Type>
  236. void Read(Type& value) {
  237. value = Read<Type, Type>();
  238. }
  239. template<class Type1, class Type2>
  240. void Read(std::pair<Type1, Type2>& value) {
  241. Read(value.first);
  242. Read(value.second);
  243. }
  244. template<class Type, class Allocator>
  245. void Read(std::vector<Type, Allocator>& value) {
  246. using TVector = std::vector<Type, Allocator>;
  247. auto size = Read<typename TVector::size_type>();
  248. value.clear();
  249. value.resize(size);
  250. for (size_t i = 0; i < size; ++i) {
  251. Read(value[i]);
  252. }
  253. }
  254. template<class TCallbackUpdate, class TCallbackDelete>
  255. void ReadItems(TCallbackUpdate updateItem, TCallbackDelete deleteKey) {
  256. MKQL_ENSURE(Buf.size(), "Serialized state is corrupted");
  257. ui32 itemsCount = ReadUi32(Buf);
  258. ui32 deletedCount = 0;
  259. if (Type == EMkqlStateType::INCREMENT) {
  260. deletedCount = ReadUi32(Buf);
  261. }
  262. for (ui32 i = 0; i < itemsCount; ++i) {
  263. auto key = ReadString(Buf);
  264. auto value = ReadString(Buf);
  265. updateItem(key, value);
  266. }
  267. if (deletedCount) {
  268. auto key = ReadString(Buf);
  269. deleteKey(key);
  270. }
  271. }
  272. bool Empty() const {
  273. return Buf.empty();
  274. }
  275. private:
  276. TString StateToString(const NUdf::TUnboxedValue& state) {
  277. TString result;
  278. auto listIt = state.GetListIterator();
  279. NUdf::TUnboxedValue str;
  280. while (listIt.Next(str)) {
  281. const TStringBuf strRef = str.AsStringRef();
  282. result.AppendNoAlias(strRef.data(), strRef.size());
  283. }
  284. return result;
  285. }
  286. protected:
  287. TString State;
  288. TStringBuf Buf;
  289. EMkqlStateType Type{EMkqlStateType::SIMPLE_BLOB};
  290. ui32 StateVersion{0};
  291. };
  292. class TNodeStateHelper {
  293. public:
  294. static void AddNodeState(TString& result, const TStringBuf& state) {
  295. WriteUi64(result, state.size());
  296. result.AppendNoAlias(state.data(), state.size());
  297. }
  298. };
  299. } // namespace NMiniKQL
  300. } // namespace NKikimr