JsonCodec.cc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * https://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. #include <algorithm>
  19. #include <boost/math/special_functions/fpclassify.hpp>
  20. #include <map>
  21. #include <memory>
  22. #include <string>
  23. #include "Decoder.hh"
  24. #include "Encoder.hh"
  25. #include "Symbol.hh"
  26. #include "ValidSchema.hh"
  27. #include "ValidatingCodec.hh"
  28. #include "../json/JsonIO.hh"
  29. namespace avro {
  30. namespace parsing {
  31. using std::make_shared;
  32. using std::istringstream;
  33. using std::map;
  34. using std::ostringstream;
  35. using std::reverse;
  36. using std::string;
  37. using std::vector;
  38. using avro::json::JsonGenerator;
  39. using avro::json::JsonNullFormatter;
  40. using avro::json::JsonParser;
  41. class JsonGrammarGenerator : public ValidatingGrammarGenerator {
  42. ProductionPtr doGenerate(const NodePtr &n,
  43. std::map<NodePtr, ProductionPtr> &m) final;
  44. };
  45. static std::string nameOf(const NodePtr &n) {
  46. if (n->hasName()) {
  47. return std::string(n->name());
  48. }
  49. std::ostringstream oss;
  50. oss << n->type();
  51. return oss.str();
  52. }
  53. ProductionPtr JsonGrammarGenerator::doGenerate(const NodePtr &n,
  54. std::map<NodePtr, ProductionPtr> &m) {
  55. switch (n->type()) {
  56. case AVRO_NULL:
  57. case AVRO_BOOL:
  58. case AVRO_INT:
  59. case AVRO_LONG:
  60. case AVRO_FLOAT:
  61. case AVRO_DOUBLE:
  62. case AVRO_STRING:
  63. case AVRO_BYTES:
  64. case AVRO_FIXED:
  65. case AVRO_ARRAY:
  66. case AVRO_MAP:
  67. case AVRO_SYMBOLIC:
  68. return ValidatingGrammarGenerator::doGenerate(n, m);
  69. case AVRO_RECORD: {
  70. ProductionPtr result = make_shared<Production>();
  71. m.erase(n);
  72. size_t c = n->leaves();
  73. result->reserve(2 + 2 * c);
  74. result->push_back(Symbol::recordStartSymbol());
  75. for (size_t i = 0; i < c; ++i) {
  76. const NodePtr &leaf = n->leafAt(i);
  77. ProductionPtr v = doGenerate(leaf, m);
  78. result->push_back(Symbol::fieldSymbol(n->nameAt(i)));
  79. copy(v->rbegin(), v->rend(), back_inserter(*result));
  80. }
  81. result->push_back(Symbol::recordEndSymbol());
  82. reverse(result->begin(), result->end());
  83. m[n] = result;
  84. return make_shared<Production>(1, Symbol::indirect(result));
  85. }
  86. case AVRO_ENUM: {
  87. vector<string> nn;
  88. size_t c = n->names();
  89. nn.reserve(c);
  90. for (size_t i = 0; i < c; ++i) {
  91. nn.push_back(n->nameAt(i));
  92. }
  93. ProductionPtr result = make_shared<Production>();
  94. result->push_back(Symbol::nameListSymbol(nn));
  95. result->push_back(Symbol::enumSymbol());
  96. m[n] = result;
  97. return result;
  98. }
  99. case AVRO_UNION: {
  100. size_t c = n->leaves();
  101. vector<ProductionPtr> vv;
  102. vv.reserve(c);
  103. vector<string> names;
  104. names.reserve(c);
  105. for (size_t i = 0; i < c; ++i) {
  106. const NodePtr &nn = n->leafAt(i);
  107. ProductionPtr v = doGenerate(nn, m);
  108. if (nn->type() != AVRO_NULL) {
  109. ProductionPtr v2 = make_shared<Production>();
  110. v2->push_back(Symbol::recordEndSymbol());
  111. copy(v->begin(), v->end(), back_inserter(*v2));
  112. v.swap(v2);
  113. }
  114. vv.push_back(v);
  115. names.push_back(nameOf(nn));
  116. }
  117. ProductionPtr result = make_shared<Production>();
  118. result->push_back(Symbol::alternative(vv));
  119. result->push_back(Symbol::nameListSymbol(names));
  120. result->push_back(Symbol::unionSymbol());
  121. return result;
  122. }
  123. default:
  124. throw Exception("Unknown node type");
  125. }
  126. }
  127. static void expectToken(JsonParser &in, JsonParser::Token tk) {
  128. in.expectToken(tk);
  129. }
  130. class JsonDecoderHandler {
  131. JsonParser &in_;
  132. public:
  133. explicit JsonDecoderHandler(JsonParser &p) : in_(p) {}
  134. size_t handle(const Symbol &s) {
  135. switch (s.kind()) {
  136. case Symbol::Kind::RecordStart:
  137. expectToken(in_, JsonParser::Token::ObjectStart);
  138. break;
  139. case Symbol::Kind::RecordEnd:
  140. expectToken(in_, JsonParser::Token::ObjectEnd);
  141. break;
  142. case Symbol::Kind::Field:
  143. expectToken(in_, JsonParser::Token::String);
  144. if (s.extra<string>() != in_.stringValue()) {
  145. throw Exception(boost::format("Incorrect field: expected \"%1%\" but got \"%2%\".") %
  146. s.extra<string>() % in_.stringValue());
  147. }
  148. break;
  149. default:
  150. break;
  151. }
  152. return 0;
  153. }
  154. };
  155. template<typename P>
  156. class JsonDecoder : public Decoder {
  157. JsonParser in_;
  158. JsonDecoderHandler handler_;
  159. P parser_;
  160. void init(InputStream &is) final;
  161. void decodeNull() final;
  162. bool decodeBool() final;
  163. int32_t decodeInt() final;
  164. int64_t decodeLong() final;
  165. float decodeFloat() final;
  166. double decodeDouble() final;
  167. void decodeString(string &value) final;
  168. void skipString() final;
  169. void decodeBytes(vector<uint8_t> &value) final;
  170. void skipBytes() final;
  171. void decodeFixed(size_t n, vector<uint8_t> &value) final;
  172. void skipFixed(size_t n) final;
  173. size_t decodeEnum() final;
  174. size_t arrayStart() final;
  175. size_t arrayNext() final;
  176. size_t skipArray() final;
  177. size_t mapStart() final;
  178. size_t mapNext() final;
  179. size_t skipMap() final;
  180. size_t decodeUnionIndex() final;
  181. void expect(JsonParser::Token tk);
  182. void skipComposite();
  183. void drain() final;
  184. public:
  185. explicit JsonDecoder(const ValidSchema &s) : handler_(in_),
  186. parser_(JsonGrammarGenerator().generate(s), NULL, handler_) {}
  187. };
  188. template<typename P>
  189. void JsonDecoder<P>::init(InputStream &is) {
  190. in_.init(is);
  191. parser_.reset();
  192. }
  193. template<typename P>
  194. void JsonDecoder<P>::expect(JsonParser::Token tk) {
  195. expectToken(in_, tk);
  196. }
  197. template<typename P>
  198. void JsonDecoder<P>::decodeNull() {
  199. parser_.advance(Symbol::Kind::Null);
  200. expect(JsonParser::Token::Null);
  201. }
  202. template<typename P>
  203. bool JsonDecoder<P>::decodeBool() {
  204. parser_.advance(Symbol::Kind::Bool);
  205. expect(JsonParser::Token::Bool);
  206. bool result = in_.boolValue();
  207. return result;
  208. }
  209. template<typename P>
  210. int32_t JsonDecoder<P>::decodeInt() {
  211. parser_.advance(Symbol::Kind::Int);
  212. expect(JsonParser::Token::Long);
  213. int64_t result = in_.longValue();
  214. if (result < INT32_MIN || result > INT32_MAX) {
  215. throw Exception(boost::format("Value out of range for Avro int: %1%")
  216. % result);
  217. }
  218. return static_cast<int32_t>(result);
  219. }
  220. template<typename P>
  221. int64_t JsonDecoder<P>::decodeLong() {
  222. parser_.advance(Symbol::Kind::Long);
  223. expect(JsonParser::Token::Long);
  224. int64_t result = in_.longValue();
  225. return result;
  226. }
  227. template<typename P>
  228. float JsonDecoder<P>::decodeFloat() {
  229. parser_.advance(Symbol::Kind::Float);
  230. expect(JsonParser::Token::Double);
  231. double result = in_.doubleValue();
  232. return static_cast<float>(result);
  233. }
  234. template<typename P>
  235. double JsonDecoder<P>::decodeDouble() {
  236. parser_.advance(Symbol::Kind::Double);
  237. expect(JsonParser::Token::Double);
  238. double result = in_.doubleValue();
  239. return result;
  240. }
  241. template<typename P>
  242. void JsonDecoder<P>::decodeString(string &value) {
  243. parser_.advance(Symbol::Kind::String);
  244. expect(JsonParser::Token::String);
  245. value = in_.stringValue();
  246. }
  247. template<typename P>
  248. void JsonDecoder<P>::skipString() {
  249. parser_.advance(Symbol::Kind::String);
  250. expect(JsonParser::Token::String);
  251. }
  252. static vector<uint8_t> toBytes(const string &s) {
  253. return vector<uint8_t>(s.begin(), s.end());
  254. }
  255. template<typename P>
  256. void JsonDecoder<P>::decodeBytes(vector<uint8_t> &value) {
  257. parser_.advance(Symbol::Kind::Bytes);
  258. expect(JsonParser::Token::String);
  259. value = toBytes(in_.bytesValue());
  260. }
  261. template<typename P>
  262. void JsonDecoder<P>::skipBytes() {
  263. parser_.advance(Symbol::Kind::Bytes);
  264. expect(JsonParser::Token::String);
  265. }
  266. template<typename P>
  267. void JsonDecoder<P>::decodeFixed(size_t n, vector<uint8_t> &value) {
  268. parser_.advance(Symbol::Kind::Fixed);
  269. parser_.assertSize(n);
  270. expect(JsonParser::Token::String);
  271. value = toBytes(in_.bytesValue());
  272. if (value.size() != n) {
  273. throw Exception("Incorrect value for fixed");
  274. }
  275. }
  276. template<typename P>
  277. void JsonDecoder<P>::skipFixed(size_t n) {
  278. parser_.advance(Symbol::Kind::Fixed);
  279. parser_.assertSize(n);
  280. expect(JsonParser::Token::String);
  281. vector<uint8_t> result = toBytes(in_.bytesValue());
  282. if (result.size() != n) {
  283. throw Exception("Incorrect value for fixed");
  284. }
  285. }
  286. template<typename P>
  287. size_t JsonDecoder<P>::decodeEnum() {
  288. parser_.advance(Symbol::Kind::Enum);
  289. expect(JsonParser::Token::String);
  290. size_t result = parser_.indexForName(in_.stringValue());
  291. return result;
  292. }
  293. template<typename P>
  294. size_t JsonDecoder<P>::arrayStart() {
  295. parser_.advance(Symbol::Kind::ArrayStart);
  296. parser_.pushRepeatCount(0);
  297. expect(JsonParser::Token::ArrayStart);
  298. return arrayNext();
  299. }
  300. template<typename P>
  301. size_t JsonDecoder<P>::arrayNext() {
  302. parser_.processImplicitActions();
  303. if (in_.peek() == JsonParser::Token::ArrayEnd) {
  304. in_.advance();
  305. parser_.popRepeater();
  306. parser_.advance(Symbol::Kind::ArrayEnd);
  307. return 0;
  308. }
  309. parser_.nextRepeatCount(1);
  310. return 1;
  311. }
  312. template<typename P>
  313. void JsonDecoder<P>::skipComposite() {
  314. size_t level = 0;
  315. for (;;) {
  316. switch (in_.advance()) {
  317. case JsonParser::Token::ArrayStart:
  318. case JsonParser::Token::ObjectStart:
  319. ++level;
  320. continue;
  321. case JsonParser::Token::ArrayEnd:
  322. case JsonParser::Token::ObjectEnd:
  323. if (level == 0) {
  324. return;
  325. }
  326. --level;
  327. continue;
  328. default:
  329. continue;
  330. }
  331. }
  332. }
  333. template<typename P>
  334. void JsonDecoder<P>::drain() {
  335. parser_.processImplicitActions();
  336. in_.drain();
  337. }
  338. template<typename P>
  339. size_t JsonDecoder<P>::skipArray() {
  340. parser_.advance(Symbol::Kind::ArrayStart);
  341. parser_.pop();
  342. parser_.advance(Symbol::Kind::ArrayEnd);
  343. expect(JsonParser::Token::ArrayStart);
  344. skipComposite();
  345. return 0;
  346. }
  347. template<typename P>
  348. size_t JsonDecoder<P>::mapStart() {
  349. parser_.advance(Symbol::Kind::MapStart);
  350. parser_.pushRepeatCount(0);
  351. expect(JsonParser::Token::ObjectStart);
  352. return mapNext();
  353. }
  354. template<typename P>
  355. size_t JsonDecoder<P>::mapNext() {
  356. parser_.processImplicitActions();
  357. if (in_.peek() == JsonParser::Token::ObjectEnd) {
  358. in_.advance();
  359. parser_.popRepeater();
  360. parser_.advance(Symbol::Kind::MapEnd);
  361. return 0;
  362. }
  363. parser_.nextRepeatCount(1);
  364. return 1;
  365. }
  366. template<typename P>
  367. size_t JsonDecoder<P>::skipMap() {
  368. parser_.advance(Symbol::Kind::MapStart);
  369. parser_.pop();
  370. parser_.advance(Symbol::Kind::MapEnd);
  371. expect(JsonParser::Token::ObjectStart);
  372. skipComposite();
  373. return 0;
  374. }
  375. template<typename P>
  376. size_t JsonDecoder<P>::decodeUnionIndex() {
  377. parser_.advance(Symbol::Kind::Union);
  378. size_t result;
  379. if (in_.peek() == JsonParser::Token::Null) {
  380. result = parser_.indexForName("null");
  381. } else {
  382. expect(JsonParser::Token::ObjectStart);
  383. expect(JsonParser::Token::String);
  384. result = parser_.indexForName(in_.stringValue());
  385. }
  386. parser_.selectBranch(result);
  387. return result;
  388. }
  389. template<typename F = JsonNullFormatter>
  390. class JsonHandler {
  391. JsonGenerator<F> &generator_;
  392. public:
  393. explicit JsonHandler(JsonGenerator<F> &g) : generator_(g) {}
  394. size_t handle(const Symbol &s) {
  395. switch (s.kind()) {
  396. case Symbol::Kind::RecordStart:
  397. generator_.objectStart();
  398. break;
  399. case Symbol::Kind::RecordEnd:
  400. generator_.objectEnd();
  401. break;
  402. case Symbol::Kind::Field:
  403. generator_.encodeString(s.extra<string>());
  404. break;
  405. default:
  406. break;
  407. }
  408. return 0;
  409. }
  410. };
  411. template<typename P, typename F = JsonNullFormatter>
  412. class JsonEncoder : public Encoder {
  413. JsonGenerator<F> out_;
  414. JsonHandler<F> handler_;
  415. P parser_;
  416. void init(OutputStream &os) final;
  417. void flush() final;
  418. int64_t byteCount() const final;
  419. void encodeNull() final;
  420. void encodeBool(bool b) final;
  421. void encodeInt(int32_t i) final;
  422. void encodeLong(int64_t l) final;
  423. void encodeFloat(float f) final;
  424. void encodeDouble(double d) final;
  425. void encodeString(const std::string &s) final;
  426. void encodeBytes(const uint8_t *bytes, size_t len) final;
  427. void encodeFixed(const uint8_t *bytes, size_t len) final;
  428. void encodeEnum(size_t e) final;
  429. void arrayStart() final;
  430. void arrayEnd() final;
  431. void mapStart() final;
  432. void mapEnd() final;
  433. void setItemCount(size_t count) final;
  434. void startItem() final;
  435. void encodeUnionIndex(size_t e) final;
  436. public:
  437. explicit JsonEncoder(const ValidSchema &schema) : handler_(out_),
  438. parser_(JsonGrammarGenerator().generate(schema), NULL, handler_) {}
  439. };
  440. template<typename P, typename F>
  441. void JsonEncoder<P, F>::init(OutputStream &os) {
  442. out_.init(os);
  443. }
  444. template<typename P, typename F>
  445. void JsonEncoder<P, F>::flush() {
  446. parser_.processImplicitActions();
  447. out_.flush();
  448. }
  449. template<typename P, typename F>
  450. int64_t JsonEncoder<P, F>::byteCount() const {
  451. return out_.byteCount();
  452. }
  453. template<typename P, typename F>
  454. void JsonEncoder<P, F>::encodeNull() {
  455. parser_.advance(Symbol::Kind::Null);
  456. out_.encodeNull();
  457. }
  458. template<typename P, typename F>
  459. void JsonEncoder<P, F>::encodeBool(bool b) {
  460. parser_.advance(Symbol::Kind::Bool);
  461. out_.encodeBool(b);
  462. }
  463. template<typename P, typename F>
  464. void JsonEncoder<P, F>::encodeInt(int32_t i) {
  465. parser_.advance(Symbol::Kind::Int);
  466. out_.encodeNumber(i);
  467. }
  468. template<typename P, typename F>
  469. void JsonEncoder<P, F>::encodeLong(int64_t l) {
  470. parser_.advance(Symbol::Kind::Long);
  471. out_.encodeNumber(l);
  472. }
  473. template<typename P, typename F>
  474. void JsonEncoder<P, F>::encodeFloat(float f) {
  475. parser_.advance(Symbol::Kind::Float);
  476. if (f == std::numeric_limits<float>::infinity()) {
  477. out_.encodeString("Infinity");
  478. } else if (-f == std::numeric_limits<float>::infinity()) {
  479. out_.encodeString("-Infinity");
  480. } else if (boost::math::isnan(f)) {
  481. out_.encodeString("NaN");
  482. } else {
  483. out_.encodeNumber(f);
  484. }
  485. }
  486. template<typename P, typename F>
  487. void JsonEncoder<P, F>::encodeDouble(double d) {
  488. parser_.advance(Symbol::Kind::Double);
  489. if (d == std::numeric_limits<double>::infinity()) {
  490. out_.encodeString("Infinity");
  491. } else if (-d == std::numeric_limits<double>::infinity()) {
  492. out_.encodeString("-Infinity");
  493. } else if (boost::math::isnan(d)) {
  494. out_.encodeString("NaN");
  495. } else {
  496. out_.encodeNumber(d);
  497. }
  498. }
  499. template<typename P, typename F>
  500. void JsonEncoder<P, F>::encodeString(const std::string &s) {
  501. parser_.advance(Symbol::Kind::String);
  502. out_.encodeString(s);
  503. }
  504. template<typename P, typename F>
  505. void JsonEncoder<P, F>::encodeBytes(const uint8_t *bytes, size_t len) {
  506. parser_.advance(Symbol::Kind::Bytes);
  507. out_.encodeBinary(bytes, len);
  508. }
  509. template<typename P, typename F>
  510. void JsonEncoder<P, F>::encodeFixed(const uint8_t *bytes, size_t len) {
  511. parser_.advance(Symbol::Kind::Fixed);
  512. parser_.assertSize(len);
  513. out_.encodeBinary(bytes, len);
  514. }
  515. template<typename P, typename F>
  516. void JsonEncoder<P, F>::encodeEnum(size_t e) {
  517. parser_.advance(Symbol::Kind::Enum);
  518. const string &s = parser_.nameForIndex(e);
  519. out_.encodeString(s);
  520. }
  521. template<typename P, typename F>
  522. void JsonEncoder<P, F>::arrayStart() {
  523. parser_.advance(Symbol::Kind::ArrayStart);
  524. parser_.pushRepeatCount(0);
  525. out_.arrayStart();
  526. }
  527. template<typename P, typename F>
  528. void JsonEncoder<P, F>::arrayEnd() {
  529. parser_.popRepeater();
  530. parser_.advance(Symbol::Kind::ArrayEnd);
  531. out_.arrayEnd();
  532. }
  533. template<typename P, typename F>
  534. void JsonEncoder<P, F>::mapStart() {
  535. parser_.advance(Symbol::Kind::MapStart);
  536. parser_.pushRepeatCount(0);
  537. out_.objectStart();
  538. }
  539. template<typename P, typename F>
  540. void JsonEncoder<P, F>::mapEnd() {
  541. parser_.popRepeater();
  542. parser_.advance(Symbol::Kind::MapEnd);
  543. out_.objectEnd();
  544. }
  545. template<typename P, typename F>
  546. void JsonEncoder<P, F>::setItemCount(size_t count) {
  547. parser_.nextRepeatCount(count);
  548. }
  549. template<typename P, typename F>
  550. void JsonEncoder<P, F>::startItem() {
  551. parser_.processImplicitActions();
  552. if (parser_.top() != Symbol::Kind::Repeater) {
  553. throw Exception("startItem at not an item boundary");
  554. }
  555. }
  556. template<typename P, typename F>
  557. void JsonEncoder<P, F>::encodeUnionIndex(size_t e) {
  558. parser_.advance(Symbol::Kind::Union);
  559. const std::string name = parser_.nameForIndex(e);
  560. if (name != "null") {
  561. out_.objectStart();
  562. out_.encodeString(name);
  563. }
  564. parser_.selectBranch(e);
  565. }
  566. } // namespace parsing
  567. DecoderPtr jsonDecoder(const ValidSchema &s) {
  568. return std::make_shared<parsing::JsonDecoder<
  569. parsing::SimpleParser<parsing::JsonDecoderHandler>>>(s);
  570. }
  571. EncoderPtr jsonEncoder(const ValidSchema &schema) {
  572. return std::make_shared<parsing::JsonEncoder<
  573. parsing::SimpleParser<parsing::JsonHandler<avro::json::JsonNullFormatter>>, avro::json::JsonNullFormatter>>(schema);
  574. }
  575. EncoderPtr jsonPrettyEncoder(const ValidSchema &schema) {
  576. return std::make_shared<parsing::JsonEncoder<
  577. parsing::SimpleParser<parsing::JsonHandler<avro::json::JsonPrettyFormatter>>, avro::json::JsonPrettyFormatter>>(schema);
  578. }
  579. } // namespace avro