proto_helpers.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. #include "proto_helpers.h"
  2. #include <yt/yt/core/misc/protobuf_helpers.h>
  3. #include <yt/cpp/mapreduce/interface/io.h>
  4. #include <yt/cpp/mapreduce/interface/fluent.h>
  5. #include <yt/yt_proto/yt/formats/extension.pb.h>
  6. #include <google/protobuf/descriptor.h>
  7. #include <google/protobuf/descriptor.pb.h>
  8. #include <google/protobuf/io/coded_stream.h>
  9. #include <util/stream/str.h>
  10. #include <util/stream/file.h>
  11. #include <util/folder/path.h>
  12. namespace NYT {
  13. using ::google::protobuf::Message;
  14. using ::google::protobuf::Descriptor;
  15. using ::google::protobuf::DescriptorPool;
  16. using ::google::protobuf::io::CodedInputStream;
  17. ////////////////////////////////////////////////////////////////////////////////
  18. namespace {
  19. TVector<const Descriptor*> GetJobDescriptors(const TString& fileName)
  20. {
  21. TVector<const Descriptor*> descriptors;
  22. if (!TFsPath(fileName).Exists()) {
  23. ythrow TIOException() <<
  24. "Cannot load '" << fileName << "' file";
  25. }
  26. TIFStream input(fileName);
  27. TString line;
  28. while (input.ReadLine(line)) {
  29. const auto* pool = DescriptorPool::generated_pool();
  30. const auto* descriptor = pool->FindMessageTypeByName(line);
  31. descriptors.push_back(descriptor);
  32. }
  33. return descriptors;
  34. }
  35. } // namespace
  36. ////////////////////////////////////////////////////////////////////////////////
  37. TVector<const Descriptor*> GetJobInputDescriptors()
  38. {
  39. return GetJobDescriptors("proto_input");
  40. }
  41. TVector<const Descriptor*> GetJobOutputDescriptors()
  42. {
  43. return GetJobDescriptors("proto_output");
  44. }
  45. void ValidateProtoDescriptor(
  46. const Message& row,
  47. size_t tableIndex,
  48. const TVector<const Descriptor*>& descriptors,
  49. bool isRead)
  50. {
  51. const char* direction = isRead ? "input" : "output";
  52. if (tableIndex >= descriptors.size()) {
  53. ythrow TIOException() <<
  54. "Table index " << tableIndex <<
  55. " is out of range [0, " << descriptors.size() <<
  56. ") in " << direction;
  57. }
  58. if (row.GetDescriptor() != descriptors[tableIndex]) {
  59. ythrow TIOException() <<
  60. "Invalid row of type " << row.GetDescriptor()->full_name() <<
  61. " at index " << tableIndex <<
  62. ", row of type " << descriptors[tableIndex]->full_name() <<
  63. " expected in " << direction;
  64. }
  65. }
  66. void ParseFromArcadiaStream(IInputStream* stream, Message& row, ui32 length)
  67. {
  68. TLengthLimitedInput input(stream, length);
  69. TProtobufInputStreamAdaptor adaptor(&input);
  70. CodedInputStream codedStream(&adaptor);
  71. codedStream.SetTotalBytesLimit(length + 1);
  72. bool parsedOk = row.ParseFromCodedStream(&codedStream);
  73. Y_ENSURE(parsedOk, "Failed to parse protobuf message");
  74. Y_ENSURE(input.Left() == 0);
  75. }
  76. ////////////////////////////////////////////////////////////////////////////////
  77. } // namespace NYT