skiff_row_table_reader.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. #include "skiff_row_table_reader.h"
  2. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  3. #include <yt/cpp/mapreduce/interface/skiff_row.h>
  4. #include <library/cpp/skiff/skiff.h>
  5. #include <library/cpp/yt/logging/logger.h>
  6. namespace NYT {
  7. ////////////////////////////////////////////////////////////////////////////////
  8. TSkiffRowTableReader::TSkiffRowTableReader(
  9. ::TIntrusivePtr<TRawTableReader> input,
  10. const NSkiff::TSkiffSchemaPtr& schema,
  11. TVector<ISkiffRowSkipperPtr>&& skippers,
  12. NDetail::TCreateSkiffSchemaOptions&& options)
  13. : Input_(std::move(input))
  14. , BufferedInput_(&Input_)
  15. , Parser_({schema, &BufferedInput_})
  16. , Skippers_(std::move(skippers))
  17. , Options_(std::move(options))
  18. {
  19. Next();
  20. }
  21. TSkiffRowTableReader::~TSkiffRowTableReader()
  22. { }
  23. bool TSkiffRowTableReader::Retry()
  24. {
  25. if (PrepareRetry()) {
  26. RowTaken_ = true;
  27. Next();
  28. return true;
  29. }
  30. return false;
  31. }
  32. bool TSkiffRowTableReader::PrepareRetry()
  33. {
  34. if (Input_.Retry(RangeIndex_, RowIndex_)) {
  35. if (RangeIndex_) {
  36. RangeIndexShift_ += *RangeIndex_;
  37. }
  38. RowIndex_.Clear();
  39. RangeIndex_.Clear();
  40. BufferedInput_ = TBufferedInput(&Input_);
  41. Parser_.emplace(&BufferedInput_);
  42. return true;
  43. }
  44. return false;
  45. }
  46. void TSkiffRowTableReader::ReadRow(const ISkiffRowParserPtr& parser)
  47. {
  48. while (true) {
  49. try {
  50. parser->Parse(&Parser_.value());
  51. RowTaken_ = true;
  52. // We successfully parsed one more row from the stream,
  53. // so reset retry count to their initial value.
  54. Input_.ResetRetries();
  55. break;
  56. } catch (const std::exception& ex) {
  57. YT_LOG_ERROR("Read error during parsing: %v", ex.what());
  58. if (!Retry()) {
  59. throw;
  60. }
  61. }
  62. }
  63. }
  64. bool TSkiffRowTableReader::IsValid() const
  65. {
  66. return Valid_;
  67. }
  68. void TSkiffRowTableReader::SkipRow()
  69. {
  70. CheckValidity();
  71. while (true) {
  72. try {
  73. Skippers_[TableIndex_]->SkipRow(&Parser_.value());
  74. break;
  75. } catch (const std::exception& ex) {
  76. YT_LOG_ERROR("Read error during skipping row: %v", ex.what());
  77. if (!Retry()) {
  78. throw;
  79. }
  80. }
  81. }
  82. }
  83. void TSkiffRowTableReader::CheckValidity() const {
  84. if (!IsValid()) {
  85. ythrow yexception() << "Iterator is not valid";
  86. }
  87. }
  88. void TSkiffRowTableReader::Next()
  89. {
  90. if (!RowTaken_) {
  91. SkipRow();
  92. }
  93. CheckValidity();
  94. if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
  95. Finished_ = true;
  96. Valid_ = false;
  97. return;
  98. }
  99. if (AfterKeySwitch_) {
  100. AfterKeySwitch_ = false;
  101. return;
  102. }
  103. if (RowIndex_) {
  104. ++*RowIndex_;
  105. }
  106. while (true) {
  107. try {
  108. auto tag = Parser_->ParseVariant16Tag();
  109. if (tag == NSkiff::EndOfSequenceTag<ui16>()) {
  110. IsEndOfStream_ = true;
  111. break;
  112. } else {
  113. TableIndex_ = tag;
  114. }
  115. if (TableIndex_ >= Skippers_.size()) {
  116. ythrow TIOException() <<
  117. "Table index " << TableIndex_ <<
  118. " is out of range [0, " << Skippers_.size() <<
  119. ") in read";
  120. }
  121. if (Options_.HasKeySwitch_) {
  122. auto keySwitch = Parser_->ParseBoolean();
  123. if (keySwitch) {
  124. AfterKeySwitch_ = true;
  125. Valid_ = false;
  126. }
  127. }
  128. auto tagRowIndex = Parser_->ParseVariant8Tag();
  129. if (tagRowIndex == 1) {
  130. RowIndex_ = Parser_->ParseInt64();
  131. } else {
  132. Y_ENSURE(tagRowIndex == 0, "Tag for row_index was expected to be 0 or 1, got " << tagRowIndex);
  133. }
  134. if (Options_.HasRangeIndex_) {
  135. auto tagRangeIndex = Parser_->ParseVariant8Tag();
  136. if (tagRangeIndex == 1) {
  137. RangeIndex_ = Parser_->ParseInt64();
  138. } else {
  139. Y_ENSURE(tagRangeIndex == 0, "Tag for range_index was expected to be 0 or 1, got " << tagRangeIndex);
  140. }
  141. }
  142. break;
  143. } catch (const std::exception& ex) {
  144. YT_LOG_ERROR("Read error: %v", ex.what());
  145. if (!PrepareRetry()) {
  146. throw;
  147. }
  148. }
  149. }
  150. RowTaken_ = false;
  151. }
  152. ui32 TSkiffRowTableReader::GetTableIndex() const
  153. {
  154. CheckValidity();
  155. return TableIndex_;
  156. }
  157. ui32 TSkiffRowTableReader::GetRangeIndex() const
  158. {
  159. CheckValidity();
  160. return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
  161. }
  162. ui64 TSkiffRowTableReader::GetRowIndex() const
  163. {
  164. CheckValidity();
  165. return RowIndex_.GetOrElse(0ULL);
  166. }
  167. void TSkiffRowTableReader::NextKey() {
  168. while (Valid_) {
  169. Next();
  170. }
  171. if (Finished_) {
  172. return;
  173. }
  174. Valid_ = true;
  175. if (RowIndex_) {
  176. --*RowIndex_;
  177. }
  178. RowTaken_ = true;
  179. }
  180. TMaybe<size_t> TSkiffRowTableReader::GetReadByteCount() const {
  181. return Input_.GetReadByteCount();
  182. }
  183. bool TSkiffRowTableReader::IsEndOfStream() const {
  184. return IsEndOfStream_;
  185. }
  186. bool TSkiffRowTableReader::IsRawReaderExhausted() const {
  187. return Finished_;
  188. }
  189. ////////////////////////////////////////////////////////////////////////////////
  190. } // namespace NYT