/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include "Decoder.hh" #include "Encoder.hh" #include "Symbol.hh" #include "ValidSchema.hh" #include "ValidatingCodec.hh" #include "../json/JsonIO.hh" namespace avro { namespace parsing { using std::make_shared; using std::istringstream; using std::map; using std::ostringstream; using std::reverse; using std::string; using std::vector; using avro::json::JsonGenerator; using avro::json::JsonNullFormatter; using avro::json::JsonParser; class JsonGrammarGenerator : public ValidatingGrammarGenerator { ProductionPtr doGenerate(const NodePtr &n, std::map &m) final; }; static std::string nameOf(const NodePtr &n) { if (n->hasName()) { return std::string(n->name()); } std::ostringstream oss; oss << n->type(); return oss.str(); } ProductionPtr JsonGrammarGenerator::doGenerate(const NodePtr &n, std::map &m) { switch (n->type()) { case AVRO_NULL: case AVRO_BOOL: case AVRO_INT: case AVRO_LONG: case AVRO_FLOAT: case AVRO_DOUBLE: case AVRO_STRING: case AVRO_BYTES: case AVRO_FIXED: case AVRO_ARRAY: case AVRO_MAP: case AVRO_SYMBOLIC: return ValidatingGrammarGenerator::doGenerate(n, m); case AVRO_RECORD: { ProductionPtr result = make_shared(); m.erase(n); size_t c = n->leaves(); result->reserve(2 + 2 * c); result->push_back(Symbol::recordStartSymbol()); for (size_t i = 0; i < c; ++i) { const NodePtr &leaf = n->leafAt(i); ProductionPtr v = doGenerate(leaf, m); result->push_back(Symbol::fieldSymbol(n->nameAt(i))); copy(v->rbegin(), v->rend(), back_inserter(*result)); } result->push_back(Symbol::recordEndSymbol()); reverse(result->begin(), result->end()); m[n] = result; return make_shared(1, Symbol::indirect(result)); } case AVRO_ENUM: { vector nn; size_t c = n->names(); nn.reserve(c); for (size_t i = 0; i < c; ++i) { nn.push_back(n->nameAt(i)); } ProductionPtr result = make_shared(); result->push_back(Symbol::nameListSymbol(nn)); result->push_back(Symbol::enumSymbol()); m[n] = result; return result; } case AVRO_UNION: { size_t c = n->leaves(); vector vv; vv.reserve(c); vector names; names.reserve(c); for (size_t i = 0; i < c; ++i) { const NodePtr &nn = n->leafAt(i); ProductionPtr v = doGenerate(nn, m); if (nn->type() != AVRO_NULL) { ProductionPtr v2 = make_shared(); v2->push_back(Symbol::recordEndSymbol()); copy(v->begin(), v->end(), back_inserter(*v2)); v.swap(v2); } vv.push_back(v); names.push_back(nameOf(nn)); } ProductionPtr result = make_shared(); result->push_back(Symbol::alternative(vv)); result->push_back(Symbol::nameListSymbol(names)); result->push_back(Symbol::unionSymbol()); return result; } default: throw Exception("Unknown node type"); } } static void expectToken(JsonParser &in, JsonParser::Token tk) { in.expectToken(tk); } class JsonDecoderHandler { JsonParser &in_; public: explicit JsonDecoderHandler(JsonParser &p) : in_(p) {} size_t handle(const Symbol &s) { switch (s.kind()) { case Symbol::Kind::RecordStart: expectToken(in_, JsonParser::Token::ObjectStart); break; case Symbol::Kind::RecordEnd: expectToken(in_, JsonParser::Token::ObjectEnd); break; case Symbol::Kind::Field: expectToken(in_, JsonParser::Token::String); if (s.extra() != in_.stringValue()) { throw Exception(boost::format("Incorrect field: expected \"%1%\" but got \"%2%\".") % s.extra() % in_.stringValue()); } break; default: break; } return 0; } }; template class JsonDecoder : public Decoder { JsonParser in_; JsonDecoderHandler handler_; P parser_; void init(InputStream &is) final; void decodeNull() final; bool decodeBool() final; int32_t decodeInt() final; int64_t decodeLong() final; float decodeFloat() final; double decodeDouble() final; void decodeString(string &value) final; void skipString() final; void decodeBytes(vector &value) final; void skipBytes() final; void decodeFixed(size_t n, vector &value) final; void skipFixed(size_t n) final; size_t decodeEnum() final; size_t arrayStart() final; size_t arrayNext() final; size_t skipArray() final; size_t mapStart() final; size_t mapNext() final; size_t skipMap() final; size_t decodeUnionIndex() final; void expect(JsonParser::Token tk); void skipComposite(); void drain() final; public: explicit JsonDecoder(const ValidSchema &s) : handler_(in_), parser_(JsonGrammarGenerator().generate(s), NULL, handler_) {} }; template void JsonDecoder

::init(InputStream &is) { in_.init(is); parser_.reset(); } template void JsonDecoder

::expect(JsonParser::Token tk) { expectToken(in_, tk); } template void JsonDecoder

::decodeNull() { parser_.advance(Symbol::Kind::Null); expect(JsonParser::Token::Null); } template bool JsonDecoder

::decodeBool() { parser_.advance(Symbol::Kind::Bool); expect(JsonParser::Token::Bool); bool result = in_.boolValue(); return result; } template int32_t JsonDecoder

::decodeInt() { parser_.advance(Symbol::Kind::Int); expect(JsonParser::Token::Long); int64_t result = in_.longValue(); if (result < INT32_MIN || result > INT32_MAX) { throw Exception(boost::format("Value out of range for Avro int: %1%") % result); } return static_cast(result); } template int64_t JsonDecoder

::decodeLong() { parser_.advance(Symbol::Kind::Long); expect(JsonParser::Token::Long); int64_t result = in_.longValue(); return result; } template float JsonDecoder

::decodeFloat() { parser_.advance(Symbol::Kind::Float); expect(JsonParser::Token::Double); double result = in_.doubleValue(); return static_cast(result); } template double JsonDecoder

::decodeDouble() { parser_.advance(Symbol::Kind::Double); expect(JsonParser::Token::Double); double result = in_.doubleValue(); return result; } template void JsonDecoder

::decodeString(string &value) { parser_.advance(Symbol::Kind::String); expect(JsonParser::Token::String); value = in_.stringValue(); } template void JsonDecoder

::skipString() { parser_.advance(Symbol::Kind::String); expect(JsonParser::Token::String); } static vector toBytes(const string &s) { return vector(s.begin(), s.end()); } template void JsonDecoder

::decodeBytes(vector &value) { parser_.advance(Symbol::Kind::Bytes); expect(JsonParser::Token::String); value = toBytes(in_.bytesValue()); } template void JsonDecoder

::skipBytes() { parser_.advance(Symbol::Kind::Bytes); expect(JsonParser::Token::String); } template void JsonDecoder

::decodeFixed(size_t n, vector &value) { parser_.advance(Symbol::Kind::Fixed); parser_.assertSize(n); expect(JsonParser::Token::String); value = toBytes(in_.bytesValue()); if (value.size() != n) { throw Exception("Incorrect value for fixed"); } } template void JsonDecoder

::skipFixed(size_t n) { parser_.advance(Symbol::Kind::Fixed); parser_.assertSize(n); expect(JsonParser::Token::String); vector result = toBytes(in_.bytesValue()); if (result.size() != n) { throw Exception("Incorrect value for fixed"); } } template size_t JsonDecoder

::decodeEnum() { parser_.advance(Symbol::Kind::Enum); expect(JsonParser::Token::String); size_t result = parser_.indexForName(in_.stringValue()); return result; } template size_t JsonDecoder

::arrayStart() { parser_.advance(Symbol::Kind::ArrayStart); parser_.pushRepeatCount(0); expect(JsonParser::Token::ArrayStart); return arrayNext(); } template size_t JsonDecoder

::arrayNext() { parser_.processImplicitActions(); if (in_.peek() == JsonParser::Token::ArrayEnd) { in_.advance(); parser_.popRepeater(); parser_.advance(Symbol::Kind::ArrayEnd); return 0; } parser_.nextRepeatCount(1); return 1; } template void JsonDecoder

::skipComposite() { size_t level = 0; for (;;) { switch (in_.advance()) { case JsonParser::Token::ArrayStart: case JsonParser::Token::ObjectStart: ++level; continue; case JsonParser::Token::ArrayEnd: case JsonParser::Token::ObjectEnd: if (level == 0) { return; } --level; continue; default: continue; } } } template void JsonDecoder

::drain() { parser_.processImplicitActions(); in_.drain(); } template size_t JsonDecoder

::skipArray() { parser_.advance(Symbol::Kind::ArrayStart); parser_.pop(); parser_.advance(Symbol::Kind::ArrayEnd); expect(JsonParser::Token::ArrayStart); skipComposite(); return 0; } template size_t JsonDecoder

::mapStart() { parser_.advance(Symbol::Kind::MapStart); parser_.pushRepeatCount(0); expect(JsonParser::Token::ObjectStart); return mapNext(); } template size_t JsonDecoder

::mapNext() { parser_.processImplicitActions(); if (in_.peek() == JsonParser::Token::ObjectEnd) { in_.advance(); parser_.popRepeater(); parser_.advance(Symbol::Kind::MapEnd); return 0; } parser_.nextRepeatCount(1); return 1; } template size_t JsonDecoder

::skipMap() { parser_.advance(Symbol::Kind::MapStart); parser_.pop(); parser_.advance(Symbol::Kind::MapEnd); expect(JsonParser::Token::ObjectStart); skipComposite(); return 0; } template size_t JsonDecoder

::decodeUnionIndex() { parser_.advance(Symbol::Kind::Union); size_t result; if (in_.peek() == JsonParser::Token::Null) { result = parser_.indexForName("null"); } else { expect(JsonParser::Token::ObjectStart); expect(JsonParser::Token::String); result = parser_.indexForName(in_.stringValue()); } parser_.selectBranch(result); return result; } template class JsonHandler { JsonGenerator &generator_; public: explicit JsonHandler(JsonGenerator &g) : generator_(g) {} size_t handle(const Symbol &s) { switch (s.kind()) { case Symbol::Kind::RecordStart: generator_.objectStart(); break; case Symbol::Kind::RecordEnd: generator_.objectEnd(); break; case Symbol::Kind::Field: generator_.encodeString(s.extra()); break; default: break; } return 0; } }; template class JsonEncoder : public Encoder { JsonGenerator out_; JsonHandler handler_; P parser_; void init(OutputStream &os) final; void flush() final; int64_t byteCount() const final; void encodeNull() final; void encodeBool(bool b) final; void encodeInt(int32_t i) final; void encodeLong(int64_t l) final; void encodeFloat(float f) final; void encodeDouble(double d) final; void encodeString(const std::string &s) final; void encodeBytes(const uint8_t *bytes, size_t len) final; void encodeFixed(const uint8_t *bytes, size_t len) final; void encodeEnum(size_t e) final; void arrayStart() final; void arrayEnd() final; void mapStart() final; void mapEnd() final; void setItemCount(size_t count) final; void startItem() final; void encodeUnionIndex(size_t e) final; public: explicit JsonEncoder(const ValidSchema &schema) : handler_(out_), parser_(JsonGrammarGenerator().generate(schema), NULL, handler_) {} }; template void JsonEncoder::init(OutputStream &os) { out_.init(os); } template void JsonEncoder::flush() { parser_.processImplicitActions(); out_.flush(); } template int64_t JsonEncoder::byteCount() const { return out_.byteCount(); } template void JsonEncoder::encodeNull() { parser_.advance(Symbol::Kind::Null); out_.encodeNull(); } template void JsonEncoder::encodeBool(bool b) { parser_.advance(Symbol::Kind::Bool); out_.encodeBool(b); } template void JsonEncoder::encodeInt(int32_t i) { parser_.advance(Symbol::Kind::Int); out_.encodeNumber(i); } template void JsonEncoder::encodeLong(int64_t l) { parser_.advance(Symbol::Kind::Long); out_.encodeNumber(l); } template void JsonEncoder::encodeFloat(float f) { parser_.advance(Symbol::Kind::Float); if (f == std::numeric_limits::infinity()) { out_.encodeString("Infinity"); } else if (-f == std::numeric_limits::infinity()) { out_.encodeString("-Infinity"); } else if (boost::math::isnan(f)) { out_.encodeString("NaN"); } else { out_.encodeNumber(f); } } template void JsonEncoder::encodeDouble(double d) { parser_.advance(Symbol::Kind::Double); if (d == std::numeric_limits::infinity()) { out_.encodeString("Infinity"); } else if (-d == std::numeric_limits::infinity()) { out_.encodeString("-Infinity"); } else if (boost::math::isnan(d)) { out_.encodeString("NaN"); } else { out_.encodeNumber(d); } } template void JsonEncoder::encodeString(const std::string &s) { parser_.advance(Symbol::Kind::String); out_.encodeString(s); } template void JsonEncoder::encodeBytes(const uint8_t *bytes, size_t len) { parser_.advance(Symbol::Kind::Bytes); out_.encodeBinary(bytes, len); } template void JsonEncoder::encodeFixed(const uint8_t *bytes, size_t len) { parser_.advance(Symbol::Kind::Fixed); parser_.assertSize(len); out_.encodeBinary(bytes, len); } template void JsonEncoder::encodeEnum(size_t e) { parser_.advance(Symbol::Kind::Enum); const string &s = parser_.nameForIndex(e); out_.encodeString(s); } template void JsonEncoder::arrayStart() { parser_.advance(Symbol::Kind::ArrayStart); parser_.pushRepeatCount(0); out_.arrayStart(); } template void JsonEncoder::arrayEnd() { parser_.popRepeater(); parser_.advance(Symbol::Kind::ArrayEnd); out_.arrayEnd(); } template void JsonEncoder::mapStart() { parser_.advance(Symbol::Kind::MapStart); parser_.pushRepeatCount(0); out_.objectStart(); } template void JsonEncoder::mapEnd() { parser_.popRepeater(); parser_.advance(Symbol::Kind::MapEnd); out_.objectEnd(); } template void JsonEncoder::setItemCount(size_t count) { parser_.nextRepeatCount(count); } template void JsonEncoder::startItem() { parser_.processImplicitActions(); if (parser_.top() != Symbol::Kind::Repeater) { throw Exception("startItem at not an item boundary"); } } template void JsonEncoder::encodeUnionIndex(size_t e) { parser_.advance(Symbol::Kind::Union); const std::string name = parser_.nameForIndex(e); if (name != "null") { out_.objectStart(); out_.encodeString(name); } parser_.selectBranch(e); } } // namespace parsing DecoderPtr jsonDecoder(const ValidSchema &s) { return std::make_shared>>(s); } EncoderPtr jsonEncoder(const ValidSchema &schema) { return std::make_shared>, avro::json::JsonNullFormatter>>(schema); } EncoderPtr jsonPrettyEncoder(const ValidSchema &schema) { return std::make_shared>, avro::json::JsonPrettyFormatter>>(schema); } } // namespace avro