JsonCodec.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <algorithm>
  19. #include <boost/math/special_functions/fpclassify.hpp>
  20. #include <map>
  21. #include <memory>
  22. #include <string>
  23. #include "Decoder.hh"
  24. #include "Encoder.hh"
  25. #include "Symbol.hh"
  26. #include "ValidSchema.hh"
  27. #include "ValidatingCodec.hh"
  28. #include "../json/JsonIO.hh"
  29. namespace avro {
  30. namespace parsing {
  31. using std::make_shared;
  32. using std::istringstream;
  33. using std::map;
  34. using std::ostringstream;
  35. using std::reverse;
  36. using std::string;
  37. using std::vector;
  38. using avro::json::JsonGenerator;
  39. using avro::json::JsonNullFormatter;
  40. using avro::json::JsonParser;
  41. class JsonGrammarGenerator : public ValidatingGrammarGenerator {
  42. ProductionPtr doGenerate(const NodePtr &n,
  43. std::map<NodePtr, ProductionPtr> &m) final;
  44. };
  45. static std::string nameOf(const NodePtr &n) {
  46. if (n->hasName()) {
  47. return std::string(n->name());
  48. }
  49. std::ostringstream oss;
  50. oss << n->type();
  51. return oss.str();
  52. }
  53. ProductionPtr JsonGrammarGenerator::doGenerate(const NodePtr &n,
  54. std::map<NodePtr, ProductionPtr> &m) {
  55. switch (n->type()) {
  56. case AVRO_NULL:
  57. case AVRO_BOOL:
  58. case AVRO_INT:
  59. case AVRO_LONG:
  60. case AVRO_FLOAT:
  61. case AVRO_DOUBLE:
  62. case AVRO_STRING:
  63. case AVRO_BYTES:
  64. case AVRO_FIXED:
  65. case AVRO_ARRAY:
  66. case AVRO_MAP:
  67. case AVRO_SYMBOLIC:
  68. return ValidatingGrammarGenerator::doGenerate(n, m);
  69. case AVRO_RECORD: {
  70. ProductionPtr result = make_shared<Production>();
  71. m.erase(n);
  72. size_t c = n->leaves();
  73. result->reserve(2 + 2 * c);
  74. result->push_back(Symbol::recordStartSymbol());
  75. for (size_t i = 0; i < c; ++i) {
  76. const NodePtr &leaf = n->leafAt(i);
  77. ProductionPtr v = doGenerate(leaf, m);
  78. result->push_back(Symbol::fieldSymbol(n->nameAt(i)));
  79. copy(v->rbegin(), v->rend(), back_inserter(*result));
  80. }
  81. result->push_back(Symbol::recordEndSymbol());
  82. reverse(result->begin(), result->end());
  83. m[n] = result;
  84. return make_shared<Production>(1, Symbol::indirect(result));
  85. }
  86. case AVRO_ENUM: {
  87. vector<string> nn;
  88. size_t c = n->names();
  89. nn.reserve(c);
  90. for (size_t i = 0; i < c; ++i) {
  91. nn.push_back(n->nameAt(i));
  92. }
  93. ProductionPtr result = make_shared<Production>();
  94. result->push_back(Symbol::nameListSymbol(nn));
  95. result->push_back(Symbol::enumSymbol());
  96. m[n] = result;
  97. return result;
  98. }
  99. case AVRO_UNION: {
  100. size_t c = n->leaves();
  101. vector<ProductionPtr> vv;
  102. vv.reserve(c);
  103. vector<string> names;
  104. names.reserve(c);
  105. for (size_t i = 0; i < c; ++i) {
  106. const NodePtr &nn = n->leafAt(i);
  107. ProductionPtr v = doGenerate(nn, m);
  108. if (nn->type() != AVRO_NULL) {
  109. ProductionPtr v2 = make_shared<Production>();
  110. v2->push_back(Symbol::recordEndSymbol());
  111. copy(v->begin(), v->end(), back_inserter(*v2));
  112. v.swap(v2);
  113. }
  114. vv.push_back(v);
  115. names.push_back(nameOf(nn));
  116. }
  117. ProductionPtr result = make_shared<Production>();
  118. result->push_back(Symbol::alternative(vv));
  119. result->push_back(Symbol::nameListSymbol(names));
  120. result->push_back(Symbol::unionSymbol());
  121. return result;
  122. }
  123. default:
  124. throw Exception("Unknown node type");
  125. }
  126. }
  127. static void expectToken(JsonParser &in, JsonParser::Token tk) {
  128. in.expectToken(tk);
  129. }
  130. class JsonDecoderHandler {
  131. JsonParser &in_;
  132. public:
  133. explicit JsonDecoderHandler(JsonParser &p) : in_(p) {}
  134. size_t handle(const Symbol &s) {
  135. switch (s.kind()) {
  136. case Symbol::Kind::RecordStart:
  137. expectToken(in_, JsonParser::Token::ObjectStart);
  138. break;
  139. case Symbol::Kind::RecordEnd:
  140. expectToken(in_, JsonParser::Token::ObjectEnd);
  141. break;
  142. case Symbol::Kind::Field:
  143. expectToken(in_, JsonParser::Token::String);
  144. if (s.extra<string>() != in_.stringValue()) {
  145. throw Exception(R"(Incorrect field: expected "{}" but got "{}".)", s.extra<string>(), in_.stringValue());
  146. }
  147. break;
  148. default:
  149. break;
  150. }
  151. return 0;
  152. }
  153. };
  154. template<typename P>
  155. class JsonDecoder : public Decoder {
  156. JsonParser in_;
  157. JsonDecoderHandler handler_;
  158. P parser_;
  159. void init(InputStream &is) final;
  160. void decodeNull() final;
  161. bool decodeBool() final;
  162. int32_t decodeInt() final;
  163. int64_t decodeLong() final;
  164. float decodeFloat() final;
  165. double decodeDouble() final;
  166. void decodeString(string &value) final;
  167. void skipString() final;
  168. void decodeBytes(vector<uint8_t> &value) final;
  169. void skipBytes() final;
  170. void decodeFixed(size_t n, vector<uint8_t> &value) final;
  171. void skipFixed(size_t n) final;
  172. size_t decodeEnum() final;
  173. size_t arrayStart() final;
  174. size_t arrayNext() final;
  175. size_t skipArray() final;
  176. size_t mapStart() final;
  177. size_t mapNext() final;
  178. size_t skipMap() final;
  179. size_t decodeUnionIndex() final;
  180. void expect(JsonParser::Token tk);
  181. void skipComposite();
  182. void drain() final;
  183. public:
  184. explicit JsonDecoder(const ValidSchema &s) : handler_(in_),
  185. parser_(JsonGrammarGenerator().generate(s), NULL, handler_) {}
  186. };
  187. template<typename P>
  188. void JsonDecoder<P>::init(InputStream &is) {
  189. in_.init(is);
  190. parser_.reset();
  191. }
  192. template<typename P>
  193. void JsonDecoder<P>::expect(JsonParser::Token tk) {
  194. expectToken(in_, tk);
  195. }
  196. template<typename P>
  197. void JsonDecoder<P>::decodeNull() {
  198. parser_.advance(Symbol::Kind::Null);
  199. expect(JsonParser::Token::Null);
  200. }
  201. template<typename P>
  202. bool JsonDecoder<P>::decodeBool() {
  203. parser_.advance(Symbol::Kind::Bool);
  204. expect(JsonParser::Token::Bool);
  205. bool result = in_.boolValue();
  206. return result;
  207. }
  208. template<typename P>
  209. int32_t JsonDecoder<P>::decodeInt() {
  210. parser_.advance(Symbol::Kind::Int);
  211. expect(JsonParser::Token::Long);
  212. int64_t result = in_.longValue();
  213. if (result < INT32_MIN || result > INT32_MAX) {
  214. throw Exception("Value out of range for Avro int: {}", result);
  215. }
  216. return static_cast<int32_t>(result);
  217. }
  218. template<typename P>
  219. int64_t JsonDecoder<P>::decodeLong() {
  220. parser_.advance(Symbol::Kind::Long);
  221. expect(JsonParser::Token::Long);
  222. int64_t result = in_.longValue();
  223. return result;
  224. }
  225. template<typename P>
  226. float JsonDecoder<P>::decodeFloat() {
  227. parser_.advance(Symbol::Kind::Float);
  228. expect(JsonParser::Token::Double);
  229. double result = in_.doubleValue();
  230. return static_cast<float>(result);
  231. }
  232. template<typename P>
  233. double JsonDecoder<P>::decodeDouble() {
  234. parser_.advance(Symbol::Kind::Double);
  235. expect(JsonParser::Token::Double);
  236. double result = in_.doubleValue();
  237. return result;
  238. }
  239. template<typename P>
  240. void JsonDecoder<P>::decodeString(string &value) {
  241. parser_.advance(Symbol::Kind::String);
  242. expect(JsonParser::Token::String);
  243. value = in_.stringValue();
  244. }
  245. template<typename P>
  246. void JsonDecoder<P>::skipString() {
  247. parser_.advance(Symbol::Kind::String);
  248. expect(JsonParser::Token::String);
  249. }
  250. static vector<uint8_t> toBytes(const string &s) {
  251. return vector<uint8_t>(s.begin(), s.end());
  252. }
  253. template<typename P>
  254. void JsonDecoder<P>::decodeBytes(vector<uint8_t> &value) {
  255. parser_.advance(Symbol::Kind::Bytes);
  256. expect(JsonParser::Token::String);
  257. value = toBytes(in_.bytesValue());
  258. }
  259. template<typename P>
  260. void JsonDecoder<P>::skipBytes() {
  261. parser_.advance(Symbol::Kind::Bytes);
  262. expect(JsonParser::Token::String);
  263. }
  264. template<typename P>
  265. void JsonDecoder<P>::decodeFixed(size_t n, vector<uint8_t> &value) {
  266. parser_.advance(Symbol::Kind::Fixed);
  267. parser_.assertSize(n);
  268. expect(JsonParser::Token::String);
  269. value = toBytes(in_.bytesValue());
  270. if (value.size() != n) {
  271. throw Exception("Incorrect value for fixed");
  272. }
  273. }
  274. template<typename P>
  275. void JsonDecoder<P>::skipFixed(size_t n) {
  276. parser_.advance(Symbol::Kind::Fixed);
  277. parser_.assertSize(n);
  278. expect(JsonParser::Token::String);
  279. vector<uint8_t> result = toBytes(in_.bytesValue());
  280. if (result.size() != n) {
  281. throw Exception("Incorrect value for fixed");
  282. }
  283. }
  284. template<typename P>
  285. size_t JsonDecoder<P>::decodeEnum() {
  286. parser_.advance(Symbol::Kind::Enum);
  287. expect(JsonParser::Token::String);
  288. size_t result = parser_.indexForName(in_.stringValue());
  289. return result;
  290. }
  291. template<typename P>
  292. size_t JsonDecoder<P>::arrayStart() {
  293. parser_.advance(Symbol::Kind::ArrayStart);
  294. parser_.pushRepeatCount(0);
  295. expect(JsonParser::Token::ArrayStart);
  296. return arrayNext();
  297. }
  298. template<typename P>
  299. size_t JsonDecoder<P>::arrayNext() {
  300. parser_.processImplicitActions();
  301. if (in_.peek() == JsonParser::Token::ArrayEnd) {
  302. in_.advance();
  303. parser_.popRepeater();
  304. parser_.advance(Symbol::Kind::ArrayEnd);
  305. return 0;
  306. }
  307. parser_.nextRepeatCount(1);
  308. return 1;
  309. }
  310. template<typename P>
  311. void JsonDecoder<P>::skipComposite() {
  312. size_t level = 0;
  313. for (;;) {
  314. switch (in_.advance()) {
  315. case JsonParser::Token::ArrayStart:
  316. case JsonParser::Token::ObjectStart:
  317. ++level;
  318. continue;
  319. case JsonParser::Token::ArrayEnd:
  320. case JsonParser::Token::ObjectEnd:
  321. if (level == 0) {
  322. return;
  323. }
  324. --level;
  325. continue;
  326. default:
  327. continue;
  328. }
  329. }
  330. }
  331. template<typename P>
  332. void JsonDecoder<P>::drain() {
  333. parser_.processImplicitActions();
  334. in_.drain();
  335. }
  336. template<typename P>
  337. size_t JsonDecoder<P>::skipArray() {
  338. parser_.advance(Symbol::Kind::ArrayStart);
  339. parser_.pop();
  340. parser_.advance(Symbol::Kind::ArrayEnd);
  341. expect(JsonParser::Token::ArrayStart);
  342. skipComposite();
  343. return 0;
  344. }
  345. template<typename P>
  346. size_t JsonDecoder<P>::mapStart() {
  347. parser_.advance(Symbol::Kind::MapStart);
  348. parser_.pushRepeatCount(0);
  349. expect(JsonParser::Token::ObjectStart);
  350. return mapNext();
  351. }
  352. template<typename P>
  353. size_t JsonDecoder<P>::mapNext() {
  354. parser_.processImplicitActions();
  355. if (in_.peek() == JsonParser::Token::ObjectEnd) {
  356. in_.advance();
  357. parser_.popRepeater();
  358. parser_.advance(Symbol::Kind::MapEnd);
  359. return 0;
  360. }
  361. parser_.nextRepeatCount(1);
  362. return 1;
  363. }
  364. template<typename P>
  365. size_t JsonDecoder<P>::skipMap() {
  366. parser_.advance(Symbol::Kind::MapStart);
  367. parser_.pop();
  368. parser_.advance(Symbol::Kind::MapEnd);
  369. expect(JsonParser::Token::ObjectStart);
  370. skipComposite();
  371. return 0;
  372. }
  373. template<typename P>
  374. size_t JsonDecoder<P>::decodeUnionIndex() {
  375. parser_.advance(Symbol::Kind::Union);
  376. size_t result;
  377. if (in_.peek() == JsonParser::Token::Null) {
  378. result = parser_.indexForName("null");
  379. } else {
  380. expect(JsonParser::Token::ObjectStart);
  381. expect(JsonParser::Token::String);
  382. result = parser_.indexForName(in_.stringValue());
  383. }
  384. parser_.selectBranch(result);
  385. return result;
  386. }
  387. template<typename F = JsonNullFormatter>
  388. class JsonHandler {
  389. JsonGenerator<F> &generator_;
  390. public:
  391. explicit JsonHandler(JsonGenerator<F> &g) : generator_(g) {}
  392. size_t handle(const Symbol &s) {
  393. switch (s.kind()) {
  394. case Symbol::Kind::RecordStart:
  395. generator_.objectStart();
  396. break;
  397. case Symbol::Kind::RecordEnd:
  398. generator_.objectEnd();
  399. break;
  400. case Symbol::Kind::Field:
  401. generator_.encodeString(s.extra<string>());
  402. break;
  403. default:
  404. break;
  405. }
  406. return 0;
  407. }
  408. };
  409. template<typename P, typename F = JsonNullFormatter>
  410. class JsonEncoder : public Encoder {
  411. JsonGenerator<F> out_;
  412. JsonHandler<F> handler_;
  413. P parser_;
  414. void init(OutputStream &os) final;
  415. void flush() final;
  416. int64_t byteCount() const final;
  417. void encodeNull() final;
  418. void encodeBool(bool b) final;
  419. void encodeInt(int32_t i) final;
  420. void encodeLong(int64_t l) final;
  421. void encodeFloat(float f) final;
  422. void encodeDouble(double d) final;
  423. void encodeString(const std::string &s) final;
  424. void encodeBytes(const uint8_t *bytes, size_t len) final;
  425. void encodeFixed(const uint8_t *bytes, size_t len) final;
  426. void encodeEnum(size_t e) final;
  427. void arrayStart() final;
  428. void arrayEnd() final;
  429. void mapStart() final;
  430. void mapEnd() final;
  431. void setItemCount(size_t count) final;
  432. void startItem() final;
  433. void encodeUnionIndex(size_t e) final;
  434. public:
  435. explicit JsonEncoder(const ValidSchema &schema) : handler_(out_),
  436. parser_(JsonGrammarGenerator().generate(schema), NULL, handler_) {}
  437. };
  438. template<typename P, typename F>
  439. void JsonEncoder<P, F>::init(OutputStream &os) {
  440. out_.init(os);
  441. parser_.reset();
  442. }
  443. template<typename P, typename F>
  444. void JsonEncoder<P, F>::flush() {
  445. parser_.processImplicitActions();
  446. out_.flush();
  447. }
  448. template<typename P, typename F>
  449. int64_t JsonEncoder<P, F>::byteCount() const {
  450. return out_.byteCount();
  451. }
  452. template<typename P, typename F>
  453. void JsonEncoder<P, F>::encodeNull() {
  454. parser_.advance(Symbol::Kind::Null);
  455. out_.encodeNull();
  456. }
  457. template<typename P, typename F>
  458. void JsonEncoder<P, F>::encodeBool(bool b) {
  459. parser_.advance(Symbol::Kind::Bool);
  460. out_.encodeBool(b);
  461. }
  462. template<typename P, typename F>
  463. void JsonEncoder<P, F>::encodeInt(int32_t i) {
  464. parser_.advance(Symbol::Kind::Int);
  465. out_.encodeNumber(i);
  466. }
  467. template<typename P, typename F>
  468. void JsonEncoder<P, F>::encodeLong(int64_t l) {
  469. parser_.advance(Symbol::Kind::Long);
  470. out_.encodeNumber(l);
  471. }
  472. template<typename P, typename F>
  473. void JsonEncoder<P, F>::encodeFloat(float f) {
  474. parser_.advance(Symbol::Kind::Float);
  475. if (f == std::numeric_limits<float>::infinity()) {
  476. out_.encodeString("Infinity");
  477. } else if (-f == std::numeric_limits<float>::infinity()) {
  478. out_.encodeString("-Infinity");
  479. } else if (boost::math::isnan(f)) {
  480. out_.encodeString("NaN");
  481. } else {
  482. out_.encodeNumber(f);
  483. }
  484. }
  485. template<typename P, typename F>
  486. void JsonEncoder<P, F>::encodeDouble(double d) {
  487. parser_.advance(Symbol::Kind::Double);
  488. if (d == std::numeric_limits<double>::infinity()) {
  489. out_.encodeString("Infinity");
  490. } else if (-d == std::numeric_limits<double>::infinity()) {
  491. out_.encodeString("-Infinity");
  492. } else if (boost::math::isnan(d)) {
  493. out_.encodeString("NaN");
  494. } else {
  495. out_.encodeNumber(d);
  496. }
  497. }
  498. template<typename P, typename F>
  499. void JsonEncoder<P, F>::encodeString(const std::string &s) {
  500. parser_.advance(Symbol::Kind::String);
  501. out_.encodeString(s);
  502. }
  503. template<typename P, typename F>
  504. void JsonEncoder<P, F>::encodeBytes(const uint8_t *bytes, size_t len) {
  505. parser_.advance(Symbol::Kind::Bytes);
  506. out_.encodeBinary(bytes, len);
  507. }
  508. template<typename P, typename F>
  509. void JsonEncoder<P, F>::encodeFixed(const uint8_t *bytes, size_t len) {
  510. parser_.advance(Symbol::Kind::Fixed);
  511. parser_.assertSize(len);
  512. out_.encodeBinary(bytes, len);
  513. }
  514. template<typename P, typename F>
  515. void JsonEncoder<P, F>::encodeEnum(size_t e) {
  516. parser_.advance(Symbol::Kind::Enum);
  517. const string &s = parser_.nameForIndex(e);
  518. out_.encodeString(s);
  519. }
  520. template<typename P, typename F>
  521. void JsonEncoder<P, F>::arrayStart() {
  522. parser_.advance(Symbol::Kind::ArrayStart);
  523. parser_.pushRepeatCount(0);
  524. out_.arrayStart();
  525. }
  526. template<typename P, typename F>
  527. void JsonEncoder<P, F>::arrayEnd() {
  528. parser_.popRepeater();
  529. parser_.advance(Symbol::Kind::ArrayEnd);
  530. out_.arrayEnd();
  531. }
  532. template<typename P, typename F>
  533. void JsonEncoder<P, F>::mapStart() {
  534. parser_.advance(Symbol::Kind::MapStart);
  535. parser_.pushRepeatCount(0);
  536. out_.objectStart();
  537. }
  538. template<typename P, typename F>
  539. void JsonEncoder<P, F>::mapEnd() {
  540. parser_.popRepeater();
  541. parser_.advance(Symbol::Kind::MapEnd);
  542. out_.objectEnd();
  543. }
  544. template<typename P, typename F>
  545. void JsonEncoder<P, F>::setItemCount(size_t count) {
  546. parser_.nextRepeatCount(count);
  547. }
  548. template<typename P, typename F>
  549. void JsonEncoder<P, F>::startItem() {
  550. parser_.processImplicitActions();
  551. if (parser_.top() != Symbol::Kind::Repeater) {
  552. throw Exception("startItem at not an item boundary");
  553. }
  554. }
  555. template<typename P, typename F>
  556. void JsonEncoder<P, F>::encodeUnionIndex(size_t e) {
  557. parser_.advance(Symbol::Kind::Union);
  558. const std::string name = parser_.nameForIndex(e);
  559. if (name != "null") {
  560. out_.objectStart();
  561. out_.encodeString(name);
  562. }
  563. parser_.selectBranch(e);
  564. }
  565. } // namespace parsing
  566. DecoderPtr jsonDecoder(const ValidSchema &s) {
  567. return std::make_shared<parsing::JsonDecoder<
  568. parsing::SimpleParser<parsing::JsonDecoderHandler>>>(s);
  569. }
  570. EncoderPtr jsonEncoder(const ValidSchema &schema) {
  571. return std::make_shared<parsing::JsonEncoder<
  572. parsing::SimpleParser<parsing::JsonHandler<avro::json::JsonNullFormatter>>, avro::json::JsonNullFormatter>>(schema);
  573. }
  574. EncoderPtr jsonPrettyEncoder(const ValidSchema &schema) {
  575. return std::make_shared<parsing::JsonEncoder<
  576. parsing::SimpleParser<parsing::JsonHandler<avro::json::JsonPrettyFormatter>>, avro::json::JsonPrettyFormatter>>(schema);
  577. }
  578. } // namespace avro