yamr_table_reader.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. #include "yamr_table_reader.h"
  2. #include <yt/cpp/mapreduce/common/helpers.h>
  3. #include <yt/cpp/mapreduce/common/retry_lib.h>
  4. ////////////////////////////////////////////////////////////////////////////////
  5. static void CheckedSkip(IInputStream* input, size_t byteCount)
  6. {
  7. size_t skipped = input->Skip(byteCount);
  8. Y_ENSURE(skipped == byteCount, "Premature end of YaMR stream");
  9. }
  10. ////////////////////////////////////////////////////////////////////////////////
  11. namespace NYT {
  12. ////////////////////////////////////////////////////////////////////////////////
  13. TYaMRTableReader::TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input)
  14. : TLenvalTableReader(std::move(input))
  15. { }
  16. TYaMRTableReader::~TYaMRTableReader()
  17. { }
  18. const TYaMRRow& TYaMRTableReader::GetRow() const
  19. {
  20. CheckValidity();
  21. if (!RowTaken_) {
  22. const_cast<TYaMRTableReader*>(this)->ReadRow();
  23. }
  24. return Row_;
  25. }
  26. bool TYaMRTableReader::IsValid() const
  27. {
  28. return Valid_;
  29. }
  30. void TYaMRTableReader::Next()
  31. {
  32. TLenvalTableReader::Next();
  33. }
  34. void TYaMRTableReader::NextKey()
  35. {
  36. TLenvalTableReader::NextKey();
  37. }
  38. ui32 TYaMRTableReader::GetTableIndex() const
  39. {
  40. return TLenvalTableReader::GetTableIndex();
  41. }
  42. ui32 TYaMRTableReader::GetRangeIndex() const
  43. {
  44. return TLenvalTableReader::GetRangeIndex();
  45. }
  46. ui64 TYaMRTableReader::GetRowIndex() const
  47. {
  48. return TLenvalTableReader::GetRowIndex();
  49. }
  50. TMaybe<size_t> TYaMRTableReader::GetReadByteCount() const
  51. {
  52. return TLenvalTableReader::GetReadByteCount();
  53. }
  54. bool TYaMRTableReader::IsEndOfStream() const
  55. {
  56. return TLenvalTableReader::IsEndOfStream();
  57. }
  58. bool TYaMRTableReader::IsRawReaderExhausted() const
  59. {
  60. return TLenvalTableReader::IsRawReaderExhausted();
  61. }
  62. void TYaMRTableReader::ReadField(TString* result, i32 length)
  63. {
  64. result->resize(length);
  65. size_t count = Input_.Load(result->begin(), length);
  66. Y_ENSURE(count == static_cast<size_t>(length), "Premature end of YaMR stream");
  67. }
  68. void TYaMRTableReader::ReadRow()
  69. {
  70. while (true) {
  71. try {
  72. i32 value = static_cast<i32>(Length_);
  73. ReadField(&Key_, value);
  74. Row_.Key = Key_;
  75. ReadInteger(&value);
  76. ReadField(&SubKey_, value);
  77. Row_.SubKey = SubKey_;
  78. ReadInteger(&value);
  79. ReadField(&Value_, value);
  80. Row_.Value = Value_;
  81. RowTaken_ = true;
  82. // We successfully parsed one more row from the stream,
  83. // so reset retry count to their initial value.
  84. Input_.ResetRetries();
  85. break;
  86. } catch (const std::exception& ex) {
  87. if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) {
  88. throw;
  89. }
  90. }
  91. }
  92. }
  93. void TYaMRTableReader::SkipRow()
  94. {
  95. while (true) {
  96. try {
  97. i32 value = static_cast<i32>(Length_);
  98. CheckedSkip(&Input_, value);
  99. ReadInteger(&value);
  100. CheckedSkip(&Input_, value);
  101. ReadInteger(&value);
  102. CheckedSkip(&Input_, value);
  103. break;
  104. } catch (const std::exception& ex) {
  105. if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) {
  106. throw;
  107. }
  108. }
  109. }
  110. }
  111. ////////////////////////////////////////////////////////////////////////////////
  112. } // namespace NYT