yamr_table_reader.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #include "yamr_table_reader.h"
  2. #include <yt/cpp/mapreduce/common/helpers.h>
  3. #include <yt/cpp/mapreduce/common/retry_lib.h>
  4. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  5. ////////////////////////////////////////////////////////////////////
  6. static void CheckedSkip(IInputStream* input, size_t byteCount)
  7. {
  8. size_t skipped = input->Skip(byteCount);
  9. Y_ENSURE(skipped == byteCount, "Premature end of YaMR stream");
  10. }
  11. ////////////////////////////////////////////////////////////////////
  12. namespace NYT {
  13. using namespace NYT::NDetail::NRawClient;
  14. ////////////////////////////////////////////////////////////////////////////////
  15. TYaMRTableReader::TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input)
  16. : TLenvalTableReader(std::move(input))
  17. { }
  18. TYaMRTableReader::~TYaMRTableReader()
  19. { }
  20. const TYaMRRow& TYaMRTableReader::GetRow() const
  21. {
  22. CheckValidity();
  23. if (!RowTaken_) {
  24. const_cast<TYaMRTableReader*>(this)->ReadRow();
  25. }
  26. return Row_;
  27. }
  28. bool TYaMRTableReader::IsValid() const
  29. {
  30. return Valid_;
  31. }
  32. void TYaMRTableReader::Next()
  33. {
  34. TLenvalTableReader::Next();
  35. }
  36. void TYaMRTableReader::NextKey()
  37. {
  38. TLenvalTableReader::NextKey();
  39. }
  40. ui32 TYaMRTableReader::GetTableIndex() const
  41. {
  42. return TLenvalTableReader::GetTableIndex();
  43. }
  44. ui32 TYaMRTableReader::GetRangeIndex() const
  45. {
  46. return TLenvalTableReader::GetRangeIndex();
  47. }
  48. ui64 TYaMRTableReader::GetRowIndex() const
  49. {
  50. return TLenvalTableReader::GetRowIndex();
  51. }
  52. TMaybe<size_t> TYaMRTableReader::GetReadByteCount() const
  53. {
  54. return TLenvalTableReader::GetReadByteCount();
  55. }
  56. bool TYaMRTableReader::IsEndOfStream() const
  57. {
  58. return TLenvalTableReader::IsEndOfStream();
  59. }
  60. bool TYaMRTableReader::IsRawReaderExhausted() const
  61. {
  62. return TLenvalTableReader::IsRawReaderExhausted();
  63. }
  64. void TYaMRTableReader::ReadField(TString* result, i32 length)
  65. {
  66. result->resize(length);
  67. size_t count = Input_.Load(result->begin(), length);
  68. Y_ENSURE(count == static_cast<size_t>(length), "Premature end of YaMR stream");
  69. }
  70. void TYaMRTableReader::ReadRow()
  71. {
  72. while (true) {
  73. try {
  74. i32 value = static_cast<i32>(Length_);
  75. ReadField(&Key_, value);
  76. Row_.Key = Key_;
  77. ReadInteger(&value);
  78. ReadField(&SubKey_, value);
  79. Row_.SubKey = SubKey_;
  80. ReadInteger(&value);
  81. ReadField(&Value_, value);
  82. Row_.Value = Value_;
  83. RowTaken_ = true;
  84. // We successfully parsed one more row from the stream,
  85. // so reset retry count to their initial value.
  86. Input_.ResetRetries();
  87. break;
  88. } catch (const std::exception& ) {
  89. if (!TLenvalTableReader::Retry()) {
  90. throw;
  91. }
  92. }
  93. }
  94. }
  95. void TYaMRTableReader::SkipRow()
  96. {
  97. while (true) {
  98. try {
  99. i32 value = static_cast<i32>(Length_);
  100. CheckedSkip(&Input_, value);
  101. ReadInteger(&value);
  102. CheckedSkip(&Input_, value);
  103. ReadInteger(&value);
  104. CheckedSkip(&Input_, value);
  105. break;
  106. } catch (const std::exception& ) {
  107. if (!TLenvalTableReader::Retry()) {
  108. throw;
  109. }
  110. }
  111. }
  112. }
  113. ////////////////////////////////////////////////////////////////////////////////
  114. } // namespace NYT