lenval_table_reader.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. #include "lenval_table_reader.h"
  2. #include <yt/cpp/mapreduce/common/helpers.h>
  3. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  4. #include <util/string/printf.h>
  5. namespace NYT {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. const i32 CONTROL_ATTR_TABLE_INDEX = -1;
  8. const i32 CONTROL_ATTR_KEY_SWITCH = -2;
  9. const i32 CONTROL_ATTR_RANGE_INDEX = -3;
  10. const i32 CONTROL_ATTR_ROW_INDEX = -4;
  11. const i32 CONTROL_ATTR_END_OF_STREAM = -5;
  12. const i32 CONTROL_ATTR_TABLET_INDEX = -6;
  13. ////////////////////////////////////////////////////////////////////////////////
  14. TLenvalTableReader::TLenvalTableReader(::TIntrusivePtr<TRawTableReader> input)
  15. : Input_(std::move(input))
  16. {
  17. TLenvalTableReader::Next();
  18. }
  19. TLenvalTableReader::~TLenvalTableReader()
  20. { }
  21. void TLenvalTableReader::CheckValidity() const
  22. {
  23. if (!IsValid()) {
  24. ythrow yexception() << "Iterator is not valid";
  25. }
  26. }
  27. bool TLenvalTableReader::IsValid() const
  28. {
  29. return Valid_;
  30. }
  31. void TLenvalTableReader::Next()
  32. {
  33. if (!RowTaken_) {
  34. SkipRow();
  35. }
  36. CheckValidity();
  37. if (RowIndex_) {
  38. ++*RowIndex_;
  39. }
  40. while (true) {
  41. try {
  42. i32 value = 0;
  43. if (!ReadInteger(&value, true)) {
  44. return;
  45. }
  46. while (value < 0 && !IsEndOfStream_) {
  47. switch (value) {
  48. case CONTROL_ATTR_KEY_SWITCH:
  49. if (!AtStart_) {
  50. Valid_ = false;
  51. return;
  52. } else {
  53. ReadInteger(&value);
  54. }
  55. break;
  56. case CONTROL_ATTR_TABLE_INDEX: {
  57. ui32 tmp = 0;
  58. ReadInteger(&tmp);
  59. TableIndex_ = tmp;
  60. ReadInteger(&value);
  61. break;
  62. }
  63. case CONTROL_ATTR_ROW_INDEX: {
  64. ui64 tmp = 0;
  65. ReadInteger(&tmp);
  66. RowIndex_ = tmp;
  67. ReadInteger(&value);
  68. break;
  69. }
  70. case CONTROL_ATTR_RANGE_INDEX: {
  71. ui32 tmp = 0;
  72. ReadInteger(&tmp);
  73. RangeIndex_ = tmp;
  74. ReadInteger(&value);
  75. break;
  76. }
  77. case CONTROL_ATTR_TABLET_INDEX: {
  78. ui64 tmp = 0;
  79. ReadInteger(&tmp);
  80. TabletIndex_ = tmp;
  81. ReadInteger(&value);
  82. break;
  83. }
  84. case CONTROL_ATTR_END_OF_STREAM: {
  85. IsEndOfStream_ = true;
  86. break;
  87. }
  88. default:
  89. ythrow yexception() <<
  90. Sprintf("Invalid control integer %d in lenval stream", value);
  91. }
  92. }
  93. Length_ = static_cast<ui32>(value);
  94. RowTaken_ = false;
  95. AtStart_ = false;
  96. } catch (const std::exception& ex) {
  97. if (!PrepareRetry(std::make_exception_ptr(ex))) {
  98. throw;
  99. }
  100. continue;
  101. }
  102. break;
  103. }
  104. }
  105. bool TLenvalTableReader::Retry(const std::exception_ptr& error)
  106. {
  107. if (PrepareRetry(error)) {
  108. RowTaken_ = true;
  109. Next();
  110. return true;
  111. }
  112. return false;
  113. }
  114. void TLenvalTableReader::NextKey()
  115. {
  116. while (Valid_) {
  117. Next();
  118. }
  119. if (Finished_) {
  120. return;
  121. }
  122. Valid_ = true;
  123. if (RowIndex_) {
  124. --*RowIndex_;
  125. }
  126. RowTaken_ = true;
  127. }
  128. ui32 TLenvalTableReader::GetTableIndex() const
  129. {
  130. CheckValidity();
  131. return TableIndex_;
  132. }
  133. ui32 TLenvalTableReader::GetRangeIndex() const
  134. {
  135. CheckValidity();
  136. return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
  137. }
  138. ui64 TLenvalTableReader::GetRowIndex() const
  139. {
  140. CheckValidity();
  141. return RowIndex_.GetOrElse(0UL);
  142. }
  143. TMaybe<size_t> TLenvalTableReader::GetReadByteCount() const
  144. {
  145. return Input_.GetReadByteCount();
  146. }
  147. bool TLenvalTableReader::IsEndOfStream() const
  148. {
  149. return IsEndOfStream_;
  150. }
  151. bool TLenvalTableReader::IsRawReaderExhausted() const
  152. {
  153. return Finished_;
  154. }
  155. bool TLenvalTableReader::PrepareRetry(const std::exception_ptr& error)
  156. {
  157. if (Input_.Retry(RangeIndex_, RowIndex_, error)) {
  158. if (RangeIndex_) {
  159. RangeIndexShift_ += *RangeIndex_;
  160. }
  161. RowIndex_.Clear();
  162. RangeIndex_.Clear();
  163. return true;
  164. }
  165. return false;
  166. }
  167. ////////////////////////////////////////////////////////////////////////////////
  168. } // namespace NYT