proto_helpers.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. #include "proto_helpers.h"
  2. #include <yt/cpp/mapreduce/interface/io.h>
  3. #include <yt/cpp/mapreduce/interface/fluent.h>
  4. #include <yt/yt_proto/yt/formats/extension.pb.h>
  5. #include <google/protobuf/descriptor.h>
  6. #include <google/protobuf/descriptor.pb.h>
  7. #include <google/protobuf/messagext.h>
  8. #include <google/protobuf/io/coded_stream.h>
  9. #include <util/stream/str.h>
  10. #include <util/stream/file.h>
  11. #include <util/folder/path.h>
  12. namespace NYT {
  13. using ::google::protobuf::Message;
  14. using ::google::protobuf::Descriptor;
  15. using ::google::protobuf::DescriptorPool;
  16. using ::google::protobuf::io::CodedInputStream;
  17. using ::google::protobuf::io::TCopyingInputStreamAdaptor;
  18. ////////////////////////////////////////////////////////////////////////////////
  19. namespace {
  20. TVector<const Descriptor*> GetJobDescriptors(const TString& fileName)
  21. {
  22. TVector<const Descriptor*> descriptors;
  23. if (!TFsPath(fileName).Exists()) {
  24. ythrow TIOException() <<
  25. "Cannot load '" << fileName << "' file";
  26. }
  27. TIFStream input(fileName);
  28. TString line;
  29. while (input.ReadLine(line)) {
  30. const auto* pool = DescriptorPool::generated_pool();
  31. const auto* descriptor = pool->FindMessageTypeByName(line);
  32. descriptors.push_back(descriptor);
  33. }
  34. return descriptors;
  35. }
  36. } // namespace
  37. ////////////////////////////////////////////////////////////////////////////////
  38. TVector<const Descriptor*> GetJobInputDescriptors()
  39. {
  40. return GetJobDescriptors("proto_input");
  41. }
  42. TVector<const Descriptor*> GetJobOutputDescriptors()
  43. {
  44. return GetJobDescriptors("proto_output");
  45. }
  46. void ValidateProtoDescriptor(
  47. const Message& row,
  48. size_t tableIndex,
  49. const TVector<const Descriptor*>& descriptors,
  50. bool isRead)
  51. {
  52. const char* direction = isRead ? "input" : "output";
  53. if (tableIndex >= descriptors.size()) {
  54. ythrow TIOException() <<
  55. "Table index " << tableIndex <<
  56. " is out of range [0, " << descriptors.size() <<
  57. ") in " << direction;
  58. }
  59. if (row.GetDescriptor() != descriptors[tableIndex]) {
  60. ythrow TIOException() <<
  61. "Invalid row of type " << row.GetDescriptor()->full_name() <<
  62. " at index " << tableIndex <<
  63. ", row of type " << descriptors[tableIndex]->full_name() <<
  64. " expected in " << direction;
  65. }
  66. }
  67. void ParseFromArcadiaStream(IInputStream* stream, Message& row, ui32 length)
  68. {
  69. TLengthLimitedInput input(stream, length);
  70. TCopyingInputStreamAdaptor adaptor(&input);
  71. CodedInputStream codedStream(&adaptor);
  72. codedStream.SetTotalBytesLimit(length + 1);
  73. bool parsedOk = row.ParseFromCodedStream(&codedStream);
  74. Y_ENSURE(parsedOk, "Failed to parse protobuf message");
  75. Y_ENSURE(input.Left() == 0);
  76. }
  77. ////////////////////////////////////////////////////////////////////////////////
  78. } // namespace NYT