yql_expr_serialize.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. #include "yql_expr_serialize.h"
  2. #include <yql/essentials/minikql/pack_num.h>
  3. #include <util/generic/algorithm.h>
  4. #include <util/generic/deque.h>
  5. namespace NYql {
  6. namespace {
  7. enum ESerializeCommands {
  8. NODE_REF = 0x00,
  9. NODE_VALUE = 0x10,
  10. INLINE_STR = 0x08, // string is unique, don't write it to the pool
  11. SAME_POSITION = 0x40,
  12. ATOM_FLAG = 0x20,
  13. WIDE = 0x80, // mark wide lambdas
  14. ATOM = ATOM_FLAG | NODE_VALUE, // for atoms we will use TNodeFlags bits (1/2/4)
  15. LIST = TExprNode::List | NODE_VALUE,
  16. CALLABLE = TExprNode::Callable | NODE_VALUE,
  17. LAMBDA = TExprNode::Lambda | NODE_VALUE,
  18. ARGUMENT = TExprNode::Argument | NODE_VALUE,
  19. ARGUMENTS = TExprNode::Arguments | NODE_VALUE,
  20. WORLD = TExprNode::World | NODE_VALUE,
  21. };
  22. using namespace NKikimr;
  23. class TWriter {
  24. public:
  25. TWriter(TExprContext& ctx, ui16 components)
  26. : Ctx(ctx)
  27. , Components_(components)
  28. {
  29. }
  30. const TString& Out() const {
  31. //Cerr << "Nodes:" << WrittenNodes_.size() << ", pos: " << Positions_.size() << ", bytes: " << Out_.size() << "\n";
  32. return Out_;
  33. }
  34. void Prepare(const TExprNode& node) {
  35. TNodeSet visited;
  36. PrepareImpl(node, visited);
  37. }
  38. void Init() {
  39. WriteVar32(Components_);
  40. ui32 reusedStringCount = 0;
  41. for (auto& x : StringCounters_) {
  42. if (x.second.first > 1) {
  43. x.second.second = reusedStringCount;
  44. ++reusedStringCount;
  45. }
  46. }
  47. WriteVar32(reusedStringCount);
  48. TVector<std::pair<TStringBuf, ui32>> sortedStrings;
  49. sortedStrings.reserve(reusedStringCount);
  50. for (const auto& x : StringCounters_) {
  51. if (x.second.first > 1) {
  52. sortedStrings.push_back({ x.first, x.second.second });
  53. }
  54. }
  55. Sort(sortedStrings.begin(), sortedStrings.end(), [](const auto& x, const auto& y) { return x.second < y.second; });
  56. for (const auto& x : sortedStrings) {
  57. WriteVar32(x.first.length());
  58. WriteMany(x.first.data(), x.first.length());
  59. }
  60. if (Components_ & TSerializedExprGraphComponents::Positions) {
  61. WriteVar32(Files_.size());
  62. TVector<std::pair<TStringBuf, ui32>> sortedFiles;
  63. sortedFiles.reserve(Files_.size());
  64. for (const auto& x : Files_) {
  65. sortedFiles.push_back({ x.first, x.second });
  66. }
  67. Sort(sortedFiles.begin(), sortedFiles.end(), [](const auto& x, const auto& y) { return x.second < y.second; });
  68. for (const auto& x : sortedFiles) {
  69. WriteVar32(x.first.length());
  70. WriteMany(x.first.data(), x.first.length());
  71. }
  72. WriteVar32(Positions_.size());
  73. TVector<std::tuple<ui32, ui32, ui32, ui32>> sortedPositions;
  74. sortedPositions.reserve(Positions_.size());
  75. for (const auto& x : Positions_) {
  76. sortedPositions.push_back({ std::get<0>(x.first), std::get<1>(x.first), std::get<2>(x.first), x.second });
  77. }
  78. Sort(sortedPositions.begin(), sortedPositions.end(), [](const auto& x, const auto& y)
  79. { return std::get<3>(x) < std::get<3>(y); });
  80. for (const auto& x : sortedPositions) {
  81. WriteVar32(std::get<0>(x));
  82. WriteVar32(std::get<1>(x));
  83. WriteVar32(std::get<2>(x));
  84. }
  85. }
  86. }
  87. void Save(const TExprNode& node) {
  88. auto writtenIt = WrittenNodes_.find(&node);
  89. if (writtenIt != WrittenNodes_.end()) {
  90. Write(NODE_REF);
  91. WriteVar32(writtenIt->second);
  92. return;
  93. }
  94. char command = (node.Type() == TExprNode::Atom) ? ATOM : ((node.Type() & TExprNode::TypeMask) | NODE_VALUE);
  95. if (node.Type() == TExprNode::Lambda && node.ChildrenSize() > 2U) {
  96. command |= WIDE;
  97. }
  98. if (Components_ & TSerializedExprGraphComponents::Positions) {
  99. // will write position
  100. if (Ctx.GetPosition(node.Pos()) == LastPosition_) {
  101. command |= SAME_POSITION;
  102. }
  103. }
  104. if (node.Type() == TExprNode::Atom) {
  105. command |= (TNodeFlags::FlagsMask & node.Flags());
  106. }
  107. ui32 strNum = 0;
  108. if (node.Type() == TExprNode::Atom || node.Type() == TExprNode::Callable || node.Type() == TExprNode::Argument) {
  109. auto strIt = StringCounters_.find(node.Content());
  110. YQL_ENSURE(strIt != StringCounters_.end());
  111. if (strIt->second.first == 1) {
  112. command |= INLINE_STR;
  113. } else {
  114. strNum = strIt->second.second;
  115. }
  116. }
  117. Write(command);
  118. if ((Components_ & TSerializedExprGraphComponents::Positions) && !(command & SAME_POSITION)) {
  119. const auto& pos = Ctx.GetPosition(node.Pos());
  120. ui32 fileNum = 0;
  121. if (pos.File) {
  122. auto fileIt = Files_.find(pos.File);
  123. YQL_ENSURE(fileIt != Files_.end());
  124. fileNum = fileIt->second;
  125. }
  126. auto posIt = Positions_.find(std::make_tuple(std::move(pos.Row), std::move(pos.Column),
  127. std::move(fileNum)));
  128. YQL_ENSURE(posIt != Positions_.end());
  129. WriteVar32(posIt->second);
  130. LastPosition_ = pos;
  131. }
  132. if (node.Type() == TExprNode::Atom || node.Type() == TExprNode::Callable || node.Type() == TExprNode::Argument) {
  133. if (command & INLINE_STR) {
  134. WriteVar32(node.Content().length());
  135. WriteMany(node.Content().data(), node.Content().length());
  136. } else {
  137. WriteVar32(strNum);
  138. }
  139. }
  140. if (node.Type() == TExprNode::Callable || node.Type() == TExprNode::Arguments || node.Type() == TExprNode::List || (node.Type() == TExprNode::Lambda && node.ChildrenSize() > 2U)) {
  141. WriteVar32(node.ChildrenSize());
  142. }
  143. for (const auto& x : node.Children()) {
  144. Save(*x);
  145. }
  146. WrittenNodes_.emplace(&node, 1 + WrittenNodes_.size());
  147. }
  148. private:
  149. void PrepareImpl(const TExprNode& node, TNodeSet& visited) {
  150. if (!visited.emplace(&node).second) {
  151. return;
  152. }
  153. if (Components_ & TSerializedExprGraphComponents::Positions) {
  154. const auto& pos = Ctx.GetPosition(node.Pos());
  155. const auto& file = pos.File;
  156. ui32 fileNum = 0;
  157. if (file) {
  158. fileNum = Files_.emplace(file, 1 + (ui32)Files_.size()).first->second;
  159. }
  160. Positions_.emplace(std::make_tuple(std::move(pos.Row), std::move(pos.Column),
  161. std::move(fileNum)), (ui32)Positions_.size());
  162. }
  163. if (node.IsAtom() || node.IsCallable() || node.Type() == TExprNode::Argument) {
  164. auto& x = StringCounters_[node.Content()];
  165. x.first++;
  166. }
  167. for (const auto& x : node.Children()) {
  168. PrepareImpl(*x, visited);
  169. }
  170. }
  171. Y_FORCE_INLINE void Write(char c) {
  172. Out_.append(c);
  173. }
  174. Y_FORCE_INLINE void WriteMany(const void* buf, size_t len) {
  175. Out_.AppendNoAlias((const char*)buf, len);
  176. }
  177. Y_FORCE_INLINE void WriteVar32(ui32 value) {
  178. char buf[MAX_PACKED32_SIZE];
  179. Out_.AppendNoAlias(buf, Pack32(value, buf));
  180. }
  181. private:
  182. TExprContext& Ctx;
  183. const ui16 Components_;
  184. THashMap<TStringBuf, ui32> Files_;
  185. THashMap<std::tuple<ui32, ui32, ui32>, ui32> Positions_;
  186. THashMap<TStringBuf, std::pair<ui32, ui32>> StringCounters_; // str -> id + serialized id
  187. TNodeMap<ui32> WrittenNodes_;
  188. TPosition LastPosition_;
  189. TString Out_;
  190. };
  191. class TReader {
  192. public:
  193. TReader(TPosition pos, TStringBuf buffer, TExprContext& ctx)
  194. : Pos_(pos)
  195. , Current_(buffer.data())
  196. , End_(buffer.data() + buffer.size())
  197. , Ctx_(ctx)
  198. , Components_(0)
  199. {
  200. }
  201. TExprNode::TPtr Load() {
  202. try {
  203. Components_ = ReadVar32();
  204. auto reusedStringCount = ReadVar32();
  205. Strings_.reserve(reusedStringCount);
  206. for (ui32 i = 0; i < reusedStringCount; ++i) {
  207. ui32 length = ReadVar32();
  208. auto internedBuf = Ctx_.AppendString(TStringBuf(ReadMany(length), length));
  209. Strings_.push_back(internedBuf);
  210. }
  211. if (Components_ & TSerializedExprGraphComponents::Positions) {
  212. auto filesCount = ReadVar32();
  213. Files_.reserve(filesCount);
  214. for (ui32 i = 0; i < filesCount; ++i) {
  215. ui32 length = ReadVar32();
  216. TStringBuf file(ReadMany(length), length);
  217. Files_.push_back(TString(file));
  218. }
  219. auto positionsCount = ReadVar32();
  220. Positions_.reserve(positionsCount);
  221. for (ui32 i = 0; i < positionsCount; ++i) {
  222. ui32 row = ReadVar32();
  223. ui32 column = ReadVar32();
  224. ui32 fileNum = ReadVar32();
  225. if (fileNum > Files_.size()) {
  226. ThrowCorrupted();
  227. }
  228. Positions_.push_back({ row, column, fileNum });
  229. }
  230. }
  231. TExprNode::TPtr result = Fetch();
  232. if (Current_ != End_) {
  233. ThrowCorrupted();
  234. }
  235. return result;
  236. } catch (const yexception& e) {
  237. TIssue issue(Pos_, TStringBuilder() << "Failed to deserialize expression graph, reason:\n" << e.what());
  238. issue.SetCode(UNEXPECTED_ERROR, ESeverity::TSeverityIds_ESeverityId_S_FATAL);
  239. Ctx_.AddError(issue);
  240. return nullptr;
  241. }
  242. }
  243. private:
  244. TExprNode::TPtr Fetch() {
  245. char command = Read();
  246. if (!(command & NODE_VALUE)) {
  247. ui32 nodeId = ReadVar32();
  248. if (nodeId == 0 || nodeId > Nodes_.size()) {
  249. ThrowCorrupted();
  250. }
  251. return Nodes_[nodeId - 1];
  252. }
  253. command &= ~NODE_VALUE;
  254. TPosition pos = Pos_;
  255. if (Components_ & TSerializedExprGraphComponents::Positions) {
  256. if (command & SAME_POSITION) {
  257. pos = LastPosition_;
  258. command &= ~SAME_POSITION;
  259. } else {
  260. ui32 posNum = ReadVar32();
  261. if (posNum >= Positions_.size()) {
  262. ThrowCorrupted();
  263. }
  264. const auto& posItem = Positions_[posNum];
  265. pos = TPosition();
  266. pos.Row = std::get<0>(posItem);
  267. pos.Column = std::get<1>(posItem);
  268. auto fileNum = std::get<2>(posItem);
  269. if (fileNum > 0) {
  270. pos.File = Files_[fileNum - 1];
  271. }
  272. LastPosition_ = pos;
  273. }
  274. }
  275. ui32 atomFlags = 0;
  276. bool hasInlineStr = command & INLINE_STR;
  277. command &= ~INLINE_STR;
  278. if (command & ATOM_FLAG) {
  279. atomFlags = command & TNodeFlags::FlagsMask;
  280. command &= ~(ATOM_FLAG | TNodeFlags::FlagsMask);
  281. command |= TExprNode::Atom;
  282. }
  283. const bool wide = command & WIDE;
  284. command &= ~WIDE;
  285. TStringBuf content;
  286. if (command == TExprNode::Atom || command == TExprNode::Callable || command == TExprNode::Argument) {
  287. if (hasInlineStr) {
  288. ui32 length = ReadVar32();
  289. content = TStringBuf(ReadMany(length), length);
  290. } else {
  291. ui32 strNum = ReadVar32();
  292. if (strNum >= Strings_.size()) {
  293. ThrowCorrupted();
  294. }
  295. content = Strings_[strNum];
  296. }
  297. }
  298. ui32 childrenSize = 0;
  299. if (command == TExprNode::Callable || command == TExprNode::Arguments || command == TExprNode::List || (command == TExprNode::Lambda && wide)) {
  300. childrenSize = ReadVar32();
  301. }
  302. TExprNode::TPtr ret;
  303. switch (command) {
  304. case TExprNode::Atom:
  305. ret = Ctx_.NewAtom(pos, content, atomFlags);
  306. break;
  307. case TExprNode::List: {
  308. TExprNode::TListType children;
  309. children.reserve(childrenSize);
  310. for (ui32 i = 0U; i < childrenSize; ++i) {
  311. children.emplace_back(Fetch());
  312. }
  313. ret = Ctx_.NewList(pos, std::move(children));
  314. break;
  315. }
  316. case TExprNode::Callable: {
  317. TExprNode::TListType children;
  318. children.reserve(childrenSize);
  319. for (ui32 i = 0U; i < childrenSize; ++i) {
  320. children.emplace_back(Fetch());
  321. }
  322. ret = Ctx_.NewCallable(pos, content, std::move(children));
  323. break;
  324. }
  325. case TExprNode::Argument:
  326. ret = Ctx_.NewArgument(pos, content);
  327. break;
  328. case TExprNode::Arguments: {
  329. TExprNode::TListType children;
  330. children.reserve(childrenSize);
  331. for (ui32 i = 0U; i < childrenSize; ++i) {
  332. children.emplace_back(Fetch());
  333. }
  334. ret = Ctx_.NewArguments(pos, std::move(children));
  335. break;
  336. }
  337. case TExprNode::Lambda:
  338. if (wide) {
  339. TExprNode::TListType children;
  340. children.reserve(childrenSize);
  341. for (ui32 i = 0U; i < childrenSize; ++i) {
  342. children.emplace_back(Fetch());
  343. }
  344. ret = Ctx_.NewLambda(pos, std::move(children));
  345. } else {
  346. auto args = Fetch();
  347. auto body = Fetch();
  348. ret = Ctx_.NewLambda(pos, {std::move(args), std::move(body)});
  349. }
  350. break;
  351. case TExprNode::World:
  352. ret = Ctx_.NewWorld(pos);
  353. break;
  354. default:
  355. ThrowCorrupted();
  356. }
  357. Nodes_.push_back(ret);
  358. return ret;
  359. }
  360. Y_FORCE_INLINE char Read() {
  361. if (Current_ == End_)
  362. ThrowNoData();
  363. return *Current_++;
  364. }
  365. Y_FORCE_INLINE const char* ReadMany(ui32 count) {
  366. if (Current_ + count > End_)
  367. ThrowNoData();
  368. const char* result = Current_;
  369. Current_ += count;
  370. return result;
  371. }
  372. Y_FORCE_INLINE ui32 ReadVar32() {
  373. ui32 result = 0;
  374. size_t count = Unpack32(Current_, End_ - Current_, result);
  375. if (!count) {
  376. ThrowCorrupted();
  377. }
  378. Current_ += count;
  379. return result;
  380. }
  381. [[noreturn]] static void ThrowNoData() {
  382. ythrow yexception() << "No more data in buffer";
  383. }
  384. [[noreturn]] static void ThrowCorrupted() {
  385. ythrow yexception() << "Serialized data is corrupted";
  386. }
  387. private:
  388. const TPosition Pos_;
  389. const char* Current_;
  390. const char* const End_;
  391. TExprContext& Ctx_;
  392. ui16 Components_;
  393. TVector<TStringBuf> Strings_;
  394. TVector<TString> Files_;
  395. TVector<std::tuple<ui32, ui32, ui32>> Positions_;
  396. TPosition LastPosition_;
  397. TDeque<TExprNode::TPtr> Nodes_;
  398. };
  399. }
  400. TString SerializeGraph(const TExprNode& node, TExprContext& ctx, ui16 components) {
  401. TWriter writer(ctx, components);
  402. writer.Prepare(node);
  403. writer.Init();
  404. writer.Save(node);
  405. return writer.Out();
  406. }
  407. TExprNode::TPtr DeserializeGraph(TPositionHandle pos, TStringBuf buffer, TExprContext& ctx) {
  408. return DeserializeGraph(ctx.GetPosition(pos), buffer, ctx);
  409. }
  410. TExprNode::TPtr DeserializeGraph(TPosition pos, TStringBuf buffer, TExprContext& ctx) {
  411. TReader reader(pos, buffer, ctx);
  412. return reader.Load();
  413. }
  414. } // namespace NYql