Resolver.cc 26 KB


  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include "Resolver.hh"
  19. #include "AvroTraits.hh"
  20. #include "Layout.hh"
  21. #include "NodeImpl.hh"
  22. #include "Reader.hh"
  23. #include "ValidSchema.hh"
  24. #include <memory>
  25. namespace avro {
  26. using std::unique_ptr;
  27. class ResolverFactory;
  28. typedef std::shared_ptr<Resolver> ResolverPtr;
  29. typedef std::vector<std::unique_ptr<Resolver>> ResolverPtrVector;
  30. // #define DEBUG_VERBOSE
  31. #ifdef DEBUG_VERBOSE
  32. #define DEBUG_OUT(str) std::cout << str << '\n'
  33. #else
  34. class NoOp {};
  35. template<typename T>
  36. NoOp &operator<<(NoOp &noOp, const T &) {
  37. return noOp;
  38. }
  39. NoOp noop;
  40. #define DEBUG_OUT(str) noop << str
  41. #endif
  42. template<typename T>
  43. class PrimitiveSkipper : public Resolver {
  44. public:
  45. PrimitiveSkipper() : Resolver() {}
  46. void parse(Reader &reader, uint8_t *address) const final {
  47. T val;
  48. reader.readValue(val);
  49. DEBUG_OUT("Skipping " << val);
  50. }
  51. };
  52. template<typename T>
  53. class PrimitiveParser : public Resolver {
  54. public:
  55. explicit PrimitiveParser(const PrimitiveLayout &offset) : Resolver(),
  56. offset_(offset.offset()) {}
  57. void parse(Reader &reader, uint8_t *address) const final {
  58. T *location = reinterpret_cast<T *>(address + offset_);
  59. reader.readValue(*location);
  60. DEBUG_OUT("Reading " << *location);
  61. }
  62. private:
  63. size_t offset_;
  64. };
  65. template<typename WT, typename RT>
  66. class PrimitivePromoter : public Resolver {
  67. public:
  68. explicit PrimitivePromoter(const PrimitiveLayout &offset) : Resolver(),
  69. offset_(offset.offset()) {}
  70. void parse(Reader &reader, uint8_t *address) const final {
  71. parseIt<WT>(reader, address);
  72. }
  73. private:
  74. void parseIt(Reader &reader, uint8_t *address, const std::true_type &) const {
  75. WT val;
  76. reader.readValue(val);
  77. RT *location = reinterpret_cast<RT *>(address + offset_);
  78. *location = static_cast<RT>(val);
  79. DEBUG_OUT("Promoting " << val);
  80. }
  81. void parseIt(Reader &reader, uint8_t *, const std::false_type &) const {}
  82. template<typename T>
  83. void parseIt(Reader &reader, uint8_t *address) const {
  84. parseIt(reader, address, is_promotable<T>());
  85. }
  86. size_t offset_;
  87. };
  88. template<>
  89. class PrimitiveSkipper<std::vector<uint8_t>> : public Resolver {
  90. public:
  91. PrimitiveSkipper() : Resolver() {}
  92. void parse(Reader &reader, uint8_t *address) const final {
  93. std::vector<uint8_t> val;
  94. reader.readBytes(val);
  95. DEBUG_OUT("Skipping bytes");
  96. }
  97. };
  98. template<>
  99. class PrimitiveParser<std::vector<uint8_t>> : public Resolver {
  100. public:
  101. explicit PrimitiveParser(const PrimitiveLayout &offset) : Resolver(),
  102. offset_(offset.offset()) {}
  103. void parse(Reader &reader, uint8_t *address) const final {
  104. auto *location = reinterpret_cast<std::vector<uint8_t> *>(address + offset_);
  105. reader.readBytes(*location);
  106. DEBUG_OUT("Reading bytes");
  107. }
  108. private:
  109. size_t offset_;
  110. };
  111. class RecordSkipper : public Resolver {
  112. public:
  113. RecordSkipper(ResolverFactory &factory, const NodePtr &writer);
  114. void parse(Reader &reader, uint8_t *address) const final {
  115. DEBUG_OUT("Skipping record");
  116. reader.readRecord();
  117. size_t steps = resolvers_.size();
  118. for (size_t i = 0; i < steps; ++i) {
  119. resolvers_[i]->parse(reader, address);
  120. }
  121. }
  122. protected:
  123. ResolverPtrVector resolvers_;
  124. };
  125. class RecordParser : public Resolver {
  126. public:
  127. void parse(Reader &reader, uint8_t *address) const final {
  128. DEBUG_OUT("Reading record");
  129. reader.readRecord();
  130. size_t steps = resolvers_.size();
  131. for (size_t i = 0; i < steps; ++i) {
  132. resolvers_[i]->parse(reader, address);
  133. }
  134. }
  135. RecordParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets);
  136. protected:
  137. ResolverPtrVector resolvers_;
  138. };
  139. class MapSkipper : public Resolver {
  140. public:
  141. MapSkipper(ResolverFactory &factory, const NodePtr &writer);
  142. void parse(Reader &reader, uint8_t *address) const final {
  143. DEBUG_OUT("Skipping map");
  144. std::string key;
  145. int64_t size;
  146. do {
  147. size = reader.readMapBlockSize();
  148. for (auto i = 0; i < size; ++i) {
  149. reader.readValue(key);
  150. resolver_->parse(reader, address);
  151. }
  152. } while (size != 0);
  153. }
  154. protected:
  155. ResolverPtr resolver_;
  156. };
  157. class MapParser : public Resolver {
  158. public:
  159. typedef uint8_t *(*GenericMapSetter)(uint8_t *map, const std::string &key);
  160. MapParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets);
  161. void parse(Reader &reader, uint8_t *address) const final {
  162. DEBUG_OUT("Reading map");
  163. uint8_t *mapAddress = address + offset_;
  164. std::string key;
  165. auto *setter = reinterpret_cast<GenericMapSetter *>(address + setFuncOffset_);
  166. int64_t size;
  167. do {
  168. size = reader.readMapBlockSize();
  169. for (auto i = 0; i < size; ++i) {
  170. reader.readValue(key);
  171. // create a new map entry and get the address
  172. uint8_t *location = (*setter)(mapAddress, key);
  173. resolver_->parse(reader, location);
  174. }
  175. } while (size != 0);
  176. }
  177. protected:
  178. ResolverPtr resolver_;
  179. size_t offset_;
  180. size_t setFuncOffset_;
  181. };
  182. class ArraySkipper : public Resolver {
  183. public:
  184. ArraySkipper(ResolverFactory &factory, const NodePtr &writer);
  185. void parse(Reader &reader, uint8_t *address) const final {
  186. DEBUG_OUT("Skipping array");
  187. int64_t size;
  188. do {
  189. size = reader.readArrayBlockSize();
  190. for (auto i = 0; i < size; ++i) {
  191. resolver_->parse(reader, address);
  192. }
  193. } while (size != 0);
  194. }
  195. protected:
  196. ResolverPtr resolver_;
  197. };
  198. typedef uint8_t *(*GenericArraySetter)(uint8_t *array);
  199. class ArrayParser : public Resolver {
  200. public:
  201. ArrayParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets);
  202. void parse(Reader &reader, uint8_t *address) const final {
  203. DEBUG_OUT("Reading array");
  204. uint8_t *arrayAddress = address + offset_;
  205. auto *setter = reinterpret_cast<GenericArraySetter *>(address + setFuncOffset_);
  206. int64_t size;
  207. do {
  208. size = reader.readArrayBlockSize();
  209. for (auto i = 0; i < size; ++i) {
  210. // create a new map entry and get the address
  211. uint8_t *location = (*setter)(arrayAddress);
  212. resolver_->parse(reader, location);
  213. }
  214. } while (size != 0);
  215. }
  216. protected:
  217. ArrayParser() : Resolver(), offset_(0), setFuncOffset_(0) {}
  218. ResolverPtr resolver_;
  219. size_t offset_;
  220. size_t setFuncOffset_;
  221. };
  222. class EnumSkipper : public Resolver {
  223. public:
  224. EnumSkipper(ResolverFactory &factory, const NodePtr &writer) : Resolver() {}
  225. void parse(Reader &reader, uint8_t *address) const final {
  226. int64_t val = reader.readEnum();
  227. DEBUG_OUT("Skipping enum" << val);
  228. }
  229. };
  230. class EnumParser : public Resolver {
  231. public:
  232. enum EnumRepresentation {
  233. VAL
  234. };
  235. EnumParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : Resolver(),
  236. offset_(offsets.at(0).offset()),
  237. readerSize_(reader->names()) {
  238. const size_t writerSize = writer->names();
  239. mapping_.reserve(writerSize);
  240. for (size_t i = 0; i < writerSize; ++i) {
  241. const std::string &name = writer->nameAt(i);
  242. size_t readerIndex = readerSize_;
  243. reader->nameIndex(name, readerIndex);
  244. mapping_.push_back(readerIndex);
  245. }
  246. }
  247. void parse(Reader &reader, uint8_t *address) const final {
  248. auto val = static_cast<size_t>(reader.readEnum());
  249. assert(static_cast<size_t>(val) < mapping_.size());
  250. if (mapping_[val] < readerSize_) {
  251. auto *location = reinterpret_cast<EnumRepresentation *>(address + offset_);
  252. *location = static_cast<EnumRepresentation>(mapping_[val]);
  253. DEBUG_OUT("Setting enum" << *location);
  254. }
  255. }
  256. protected:
  257. size_t offset_;
  258. size_t readerSize_;
  259. std::vector<size_t> mapping_;
  260. };
  261. class UnionSkipper : public Resolver {
  262. public:
  263. UnionSkipper(ResolverFactory &factory, const NodePtr &writer);
  264. void parse(Reader &reader, uint8_t *address) const final {
  265. DEBUG_OUT("Skipping union");
  266. auto choice = static_cast<size_t>(reader.readUnion());
  267. resolvers_[choice]->parse(reader, address);
  268. }
  269. protected:
  270. ResolverPtrVector resolvers_;
  271. };
  272. class UnionParser : public Resolver {
  273. public:
  274. typedef uint8_t *(*GenericUnionSetter)(uint8_t *, int64_t);
  275. UnionParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets);
  276. void parse(Reader &reader, uint8_t *address) const final {
  277. DEBUG_OUT("Reading union");
  278. auto writerChoice = static_cast<size_t>(reader.readUnion());
  279. auto *readerChoice = reinterpret_cast<int64_t *>(address + choiceOffset_);
  280. *readerChoice = choiceMapping_[writerChoice];
  281. auto *setter = reinterpret_cast<GenericUnionSetter *>(address + setFuncOffset_);
  282. auto *value = reinterpret_cast<uint8_t *>(address + offset_);
  283. uint8_t *location = (*setter)(value, *readerChoice);
  284. resolvers_[writerChoice]->parse(reader, location);
  285. }
  286. protected:
  287. ResolverPtrVector resolvers_;
  288. std::vector<int64_t> choiceMapping_;
  289. size_t offset_;
  290. size_t choiceOffset_;
  291. size_t setFuncOffset_;
  292. };
  293. class UnionToNonUnionParser : public Resolver {
  294. public:
  295. typedef uint8_t *(*GenericUnionSetter)(uint8_t *, int64_t);
  296. UnionToNonUnionParser(ResolverFactory &factory,
  297. const NodePtr &writer,
  298. const NodePtr &reader,
  299. const Layout &offsets);
  300. void parse(Reader &reader, uint8_t *address) const final {
  301. DEBUG_OUT("Reading union to non-union");
  302. auto choice = static_cast<size_t>(reader.readUnion());
  303. resolvers_[choice]->parse(reader, address);
  304. }
  305. protected:
  306. ResolverPtrVector resolvers_;
  307. };
  308. class NonUnionToUnionParser : public Resolver {
  309. public:
  310. typedef uint8_t *(*GenericUnionSetter)(uint8_t *, int64_t);
  311. NonUnionToUnionParser(ResolverFactory &factory,
  312. const NodePtr &writer,
  313. const NodePtr &reader,
  314. const CompoundLayout &offsets);
  315. void parse(Reader &reader, uint8_t *address) const final {
  316. DEBUG_OUT("Reading non-union to union");
  317. auto *choice = reinterpret_cast<int64_t *>(address + choiceOffset_);
  318. *choice = choice_;
  319. auto *setter = reinterpret_cast<GenericUnionSetter *>(address + setFuncOffset_);
  320. auto *value = reinterpret_cast<uint8_t *>(address + offset_);
  321. uint8_t *location = (*setter)(value, choice_);
  322. resolver_->parse(reader, location);
  323. }
  324. protected:
  325. ResolverPtr resolver_;
  326. size_t choice_;
  327. size_t offset_;
  328. size_t choiceOffset_;
  329. size_t setFuncOffset_;
  330. };
  331. class FixedSkipper : public Resolver {
  332. public:
  333. FixedSkipper(ResolverFactory &factory, const NodePtr &writer) : Resolver() {
  334. size_ = writer->fixedSize();
  335. }
  336. void parse(Reader &reader, uint8_t *address) const final {
  337. DEBUG_OUT("Skipping fixed");
  338. std::unique_ptr<uint8_t[]> val(new uint8_t[size_]);
  339. reader.readFixed(&val[0], size_);
  340. }
  341. protected:
  342. int size_;
  343. };
  344. class FixedParser : public Resolver {
  345. public:
  346. FixedParser(ResolverFactory &factory, const NodePtr &writer, const NodePtr &reader, const CompoundLayout &offsets) : Resolver() {
  347. size_ = writer->fixedSize();
  348. offset_ = offsets.at(0).offset();
  349. }
  350. void parse(Reader &reader, uint8_t *address) const final {
  351. DEBUG_OUT("Reading fixed");
  352. auto *location = reinterpret_cast<uint8_t *>(address + offset_);
  353. reader.readFixed(location, size_);
  354. }
  355. protected:
  356. int size_;
  357. size_t offset_;
  358. };
  359. class ResolverFactory : private boost::noncopyable {
  360. template<typename T>
  361. unique_ptr<Resolver>
  362. constructPrimitiveSkipper(const NodePtr &writer) {
  363. return unique_ptr<Resolver>(new PrimitiveSkipper<T>());
  364. }
  365. template<typename T>
  366. unique_ptr<Resolver>
  367. constructPrimitive(const NodePtr &writer, const NodePtr &reader, const Layout &offset) {
  368. unique_ptr<Resolver> instruction;
  369. SchemaResolution match = writer->resolve(*reader);
  370. if (match == RESOLVE_NO_MATCH) {
  371. instruction = unique_ptr<Resolver>(new PrimitiveSkipper<T>());
  372. } else if (reader->type() == AVRO_UNION) {
  373. const auto &compoundLayout = static_cast<const CompoundLayout &>(offset);
  374. instruction = unique_ptr<Resolver>(new NonUnionToUnionParser(*this, writer, reader, compoundLayout));
  375. } else if (match == RESOLVE_MATCH) {
  376. const auto &primitiveLayout = static_cast<const PrimitiveLayout &>(offset);
  377. instruction = unique_ptr<Resolver>(new PrimitiveParser<T>(primitiveLayout));
  378. } else if (match == RESOLVE_PROMOTABLE_TO_LONG) {
  379. const auto &primitiveLayout = static_cast<const PrimitiveLayout &>(offset);
  380. instruction = unique_ptr<Resolver>(new PrimitivePromoter<T, int64_t>(primitiveLayout));
  381. } else if (match == RESOLVE_PROMOTABLE_TO_FLOAT) {
  382. const auto &primitiveLayout = static_cast<const PrimitiveLayout &>(offset);
  383. instruction = unique_ptr<Resolver>(new PrimitivePromoter<T, float>(primitiveLayout));
  384. } else if (match == RESOLVE_PROMOTABLE_TO_DOUBLE) {
  385. const auto &primitiveLayout = static_cast<const PrimitiveLayout &>(offset);
  386. instruction = unique_ptr<Resolver>(new PrimitivePromoter<T, double>(primitiveLayout));
  387. } else {
  388. assert(0);
  389. }
  390. return instruction;
  391. }
  392. template<typename Skipper>
  393. unique_ptr<Resolver>
  394. constructCompoundSkipper(const NodePtr &writer) {
  395. return unique_ptr<Resolver>(new Skipper(*this, writer));
  396. }
  397. template<typename Parser, typename Skipper>
  398. unique_ptr<Resolver>
  399. constructCompound(const NodePtr &writer, const NodePtr &reader, const Layout &offset) {
  400. unique_ptr<Resolver> instruction;
  401. avro::SchemaResolution match = writer->resolve(*reader);
  402. if (match == RESOLVE_NO_MATCH) {
  403. instruction = unique_ptr<Resolver>(new Skipper(*this, writer));
  404. } else if (writer->type() != AVRO_UNION && reader->type() == AVRO_UNION) {
  405. const auto &compoundLayout = dynamic_cast<const CompoundLayout &>(offset);
  406. instruction = unique_ptr<Resolver>(new NonUnionToUnionParser(*this, writer, reader, compoundLayout));
  407. } else if (writer->type() == AVRO_UNION && reader->type() != AVRO_UNION) {
  408. instruction = unique_ptr<Resolver>(new UnionToNonUnionParser(*this, writer, reader, offset));
  409. } else {
  410. const auto &compoundLayout = dynamic_cast<const CompoundLayout &>(offset);
  411. instruction = unique_ptr<Resolver>(new Parser(*this, writer, reader, compoundLayout));
  412. }
  413. return instruction;
  414. }
  415. public:
  416. unique_ptr<Resolver>
  417. construct(const NodePtr &writer, const NodePtr &reader, const Layout &offset) {
  418. typedef unique_ptr<Resolver> (ResolverFactory::*BuilderFunc)(const NodePtr &writer, const NodePtr &reader, const Layout &offset);
  419. NodePtr currentWriter = (writer->type() == AVRO_SYMBOLIC) ? resolveSymbol(writer) : writer;
  420. NodePtr currentReader = (reader->type() == AVRO_SYMBOLIC) ? resolveSymbol(reader) : reader;
  421. static const BuilderFunc funcs[] = {
  422. &ResolverFactory::constructPrimitive<std::string>,
  423. &ResolverFactory::constructPrimitive<std::vector<uint8_t>>,
  424. &ResolverFactory::constructPrimitive<int32_t>,
  425. &ResolverFactory::constructPrimitive<int64_t>,
  426. &ResolverFactory::constructPrimitive<float>,
  427. &ResolverFactory::constructPrimitive<double>,
  428. &ResolverFactory::constructPrimitive<bool>,
  429. &ResolverFactory::constructPrimitive<Null>,
  430. &ResolverFactory::constructCompound<RecordParser, RecordSkipper>,
  431. &ResolverFactory::constructCompound<EnumParser, EnumSkipper>,
  432. &ResolverFactory::constructCompound<ArrayParser, ArraySkipper>,
  433. &ResolverFactory::constructCompound<MapParser, MapSkipper>,
  434. &ResolverFactory::constructCompound<UnionParser, UnionSkipper>,
  435. &ResolverFactory::constructCompound<FixedParser, FixedSkipper>};
  436. static_assert((sizeof(funcs) / sizeof(BuilderFunc)) == (AVRO_NUM_TYPES),
  437. "Invalid number of builder functions");
  438. BuilderFunc func = funcs[currentWriter->type()];
  439. assert(func);
  440. return ((this)->*(func))(currentWriter, currentReader, offset);
  441. }
  442. unique_ptr<Resolver>
  443. skipper(const NodePtr &writer) {
  444. typedef unique_ptr<Resolver> (ResolverFactory::*BuilderFunc)(const NodePtr &writer);
  445. NodePtr currentWriter = (writer->type() == AVRO_SYMBOLIC) ? writer->leafAt(0) : writer;
  446. static const BuilderFunc funcs[] = {
  447. &ResolverFactory::constructPrimitiveSkipper<std::string>,
  448. &ResolverFactory::constructPrimitiveSkipper<std::vector<uint8_t>>,
  449. &ResolverFactory::constructPrimitiveSkipper<int32_t>,
  450. &ResolverFactory::constructPrimitiveSkipper<int64_t>,
  451. &ResolverFactory::constructPrimitiveSkipper<float>,
  452. &ResolverFactory::constructPrimitiveSkipper<double>,
  453. &ResolverFactory::constructPrimitiveSkipper<bool>,
  454. &ResolverFactory::constructPrimitiveSkipper<Null>,
  455. &ResolverFactory::constructCompoundSkipper<RecordSkipper>,
  456. &ResolverFactory::constructCompoundSkipper<EnumSkipper>,
  457. &ResolverFactory::constructCompoundSkipper<ArraySkipper>,
  458. &ResolverFactory::constructCompoundSkipper<MapSkipper>,
  459. &ResolverFactory::constructCompoundSkipper<UnionSkipper>,
  460. &ResolverFactory::constructCompoundSkipper<FixedSkipper>};
  461. static_assert((sizeof(funcs) / sizeof(BuilderFunc)) == (AVRO_NUM_TYPES),
  462. "Invalid number of builder functions");
  463. BuilderFunc func = funcs[currentWriter->type()];
  464. assert(func);
  465. return ((this)->*(func))(currentWriter);
  466. }
  467. };
  468. RecordSkipper::RecordSkipper(ResolverFactory &factory, const NodePtr &writer) : Resolver() {
  469. size_t leaves = writer->leaves();
  470. resolvers_.reserve(leaves);
  471. for (size_t i = 0; i < leaves; ++i) {
  472. const NodePtr &w = writer->leafAt(i);
  473. resolvers_.push_back(factory.skipper(w));
  474. }
  475. }
  476. RecordParser::RecordParser(ResolverFactory &factory,
  477. const NodePtr &writer,
  478. const NodePtr &reader,
  479. const CompoundLayout &offsets) : Resolver() {
  480. size_t leaves = writer->leaves();
  481. resolvers_.reserve(leaves);
  482. for (size_t i = 0; i < leaves; ++i) {
  483. const NodePtr &w = writer->leafAt(i);
  484. const std::string &name = writer->nameAt(i);
  485. size_t readerIndex = 0;
  486. bool found = reader->nameIndex(name, readerIndex);
  487. if (found) {
  488. const NodePtr &r = reader->leafAt(readerIndex);
  489. resolvers_.push_back(factory.construct(w, r, offsets.at(readerIndex)));
  490. } else {
  491. resolvers_.push_back(factory.skipper(w));
  492. }
  493. }
  494. }
  495. MapSkipper::MapSkipper(ResolverFactory &factory, const NodePtr &writer) : Resolver(),
  496. resolver_(factory.skipper(writer->leafAt(1))) {}
  497. MapParser::MapParser(ResolverFactory &factory,
  498. const NodePtr &writer,
  499. const NodePtr &reader,
  500. const CompoundLayout &offsets) : Resolver(),
  501. resolver_(factory.construct(writer->leafAt(1), reader->leafAt(1), offsets.at(1))),
  502. offset_(offsets.offset()),
  503. setFuncOffset_(offsets.at(0).offset()) {}
  504. ArraySkipper::ArraySkipper(ResolverFactory &factory, const NodePtr &writer) : Resolver(),
  505. resolver_(factory.skipper(writer->leafAt(0))) {}
  506. ArrayParser::ArrayParser(ResolverFactory &factory,
  507. const NodePtr &writer,
  508. const NodePtr &reader,
  509. const CompoundLayout &offsets) : Resolver(),
  510. resolver_(factory.construct(writer->leafAt(0), reader->leafAt(0), offsets.at(1))),
  511. offset_(offsets.offset()),
  512. setFuncOffset_(offsets.at(0).offset()) {}
  513. UnionSkipper::UnionSkipper(ResolverFactory &factory, const NodePtr &writer) : Resolver() {
  514. size_t leaves = writer->leaves();
  515. resolvers_.reserve(leaves);
  516. for (size_t i = 0; i < leaves; ++i) {
  517. const NodePtr &w = writer->leafAt(i);
  518. resolvers_.push_back(factory.skipper(w));
  519. }
  520. }
  521. namespace {
  522. // assumes the writer is NOT a union, and the reader IS a union
  523. SchemaResolution
  524. checkUnionMatch(const NodePtr &writer, const NodePtr &reader, size_t &index) {
  525. SchemaResolution bestMatch = RESOLVE_NO_MATCH;
  526. index = 0;
  527. size_t leaves = reader->leaves();
  528. for (size_t i = 0; i < leaves; ++i) {
  529. const NodePtr &leaf = reader->leafAt(i);
  530. SchemaResolution newMatch = writer->resolve(*leaf);
  531. if (newMatch == RESOLVE_MATCH) {
  532. bestMatch = newMatch;
  533. index = i;
  534. break;
  535. }
  536. if (bestMatch == RESOLVE_NO_MATCH) {
  537. bestMatch = newMatch;
  538. index = i;
  539. }
  540. }
  541. return bestMatch;
  542. }
  543. } // namespace
  544. UnionParser::UnionParser(ResolverFactory &factory,
  545. const NodePtr &writer,
  546. const NodePtr &reader,
  547. const CompoundLayout &offsets) : Resolver(),
  548. offset_(offsets.offset()),
  549. choiceOffset_(offsets.at(0).offset()),
  550. setFuncOffset_(offsets.at(1).offset()) {
  551. size_t leaves = writer->leaves();
  552. resolvers_.reserve(leaves);
  553. choiceMapping_.reserve(leaves);
  554. for (size_t i = 0; i < leaves; ++i) {
  555. // for each writer, we need a schema match for the reader
  556. const NodePtr &w = writer->leafAt(i);
  557. size_t index = 0;
  558. SchemaResolution match = checkUnionMatch(w, reader, index);
  559. if (match == RESOLVE_NO_MATCH) {
  560. resolvers_.push_back(factory.skipper(w));
  561. // push back a non-sense number
  562. choiceMapping_.push_back(reader->leaves());
  563. } else {
  564. const NodePtr &r = reader->leafAt(index);
  565. resolvers_.push_back(factory.construct(w, r, offsets.at(index + 2)));
  566. choiceMapping_.push_back(index);
  567. }
  568. }
  569. }
  570. NonUnionToUnionParser::NonUnionToUnionParser(ResolverFactory &factory,
  571. const NodePtr &writer,
  572. const NodePtr &reader,
  573. const CompoundLayout &offsets) : Resolver(),
  574. choice_(0),
  575. offset_(offsets.offset()),
  576. choiceOffset_(offsets.at(0).offset()),
  577. setFuncOffset_(offsets.at(1).offset()) {
  578. #ifndef NDEBUG
  579. SchemaResolution bestMatch =
  580. #endif
  581. checkUnionMatch(writer, reader, choice_);
  582. assert(bestMatch != RESOLVE_NO_MATCH);
  583. resolver_ = factory.construct(writer, reader->leafAt(choice_), offsets.at(choice_ + 2));
  584. }
  585. UnionToNonUnionParser::UnionToNonUnionParser(ResolverFactory &factory,
  586. const NodePtr &writer,
  587. const NodePtr &reader,
  588. const Layout &offsets) : Resolver() {
  589. size_t leaves = writer->leaves();
  590. resolvers_.reserve(leaves);
  591. for (size_t i = 0; i < leaves; ++i) {
  592. const NodePtr &w = writer->leafAt(i);
  593. resolvers_.push_back(factory.construct(w, reader, offsets));
  594. }
  595. }
  596. unique_ptr<Resolver> constructResolver(const ValidSchema &writerSchema,
  597. const ValidSchema &readerSchema,
  598. const Layout &readerLayout) {
  599. ResolverFactory factory;
  600. return factory.construct(writerSchema.root(), readerSchema.root(), readerLayout);
  601. }
  602. } // namespace avro