skiff_table_reader.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. #include "skiff_table_reader.h"
  2. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  3. #include <library/cpp/yson/node/node_io.h>
  4. #include <yt/cpp/mapreduce/skiff/wire_type.h>
  5. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  6. #include <util/string/cast.h>
  7. namespace NYT {
  8. namespace NDetail {
  9. namespace {
  10. ////////////////////////////////////////////////////////////////////////////////
  11. enum EColumnType : i8
  12. {
  13. Dense,
  14. KeySwitch,
  15. RangeIndex,
  16. RowIndex
  17. };
  18. struct TSkiffColumnSchema
  19. {
  20. EColumnType Type;
  21. bool Required;
  22. NSkiff::EWireType WireType;
  23. TString Name;
  24. TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name)
  25. : Type(type)
  26. , Required(required)
  27. , WireType(wireType)
  28. , Name(name)
  29. { }
  30. };
  31. ////////////////////////////////////////////////////////////////////////////////
  32. } // namespace
  33. struct TSkiffTableReader::TSkiffTableSchema
  34. {
  35. TVector<TSkiffColumnSchema> Columns;
  36. };
  37. TSkiffTableReader::TSkiffTableReader(
  38. ::TIntrusivePtr<TRawTableReader> input,
  39. const NSkiff::TSkiffSchemaPtr& schema)
  40. : Input_(std::move(input))
  41. , BufferedInput_(&Input_)
  42. , Parser_(&BufferedInput_)
  43. , Schemas_(CreateSkiffTableSchemas(schema))
  44. {
  45. Next();
  46. }
  47. TSkiffTableReader::~TSkiffTableReader() = default;
  48. const TNode& TSkiffTableReader::GetRow() const
  49. {
  50. EnsureValidity();
  51. Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
  52. return Row_;
  53. }
  54. void TSkiffTableReader::MoveRow(TNode* result)
  55. {
  56. EnsureValidity();
  57. Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
  58. *result = std::move(Row_);
  59. Row_ = TNode();
  60. }
  61. bool TSkiffTableReader::IsValid() const
  62. {
  63. return Valid_;
  64. }
  65. void TSkiffTableReader::Next()
  66. {
  67. EnsureValidity();
  68. if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
  69. Finished_ = true;
  70. Valid_ = false;
  71. return;
  72. }
  73. if (AfterKeySwitch_) {
  74. AfterKeySwitch_ = false;
  75. return;
  76. }
  77. while (true) {
  78. try {
  79. ReadRow();
  80. break;
  81. } catch (const std::exception& ex) {
  82. YT_LOG_ERROR("Read error: %v", ex.what());
  83. if (!Input_.Retry(RangeIndex_, RowIndex_, std::make_exception_ptr(ex))) {
  84. throw;
  85. }
  86. BufferedInput_ = TBufferedInput(&Input_);
  87. Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_));
  88. if (RangeIndex_) {
  89. RangeIndexShift_ += *RangeIndex_;
  90. }
  91. RangeIndex_.Clear();
  92. RowIndex_.Clear();
  93. }
  94. }
  95. }
  96. ui32 TSkiffTableReader::GetTableIndex() const
  97. {
  98. EnsureValidity();
  99. return TableIndex_;
  100. }
  101. ui32 TSkiffTableReader::GetRangeIndex() const
  102. {
  103. EnsureValidity();
  104. return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
  105. }
  106. ui64 TSkiffTableReader::GetRowIndex() const
  107. {
  108. EnsureValidity();
  109. return RowIndex_.GetOrElse(0ULL);
  110. }
  111. void TSkiffTableReader::NextKey()
  112. {
  113. while (Valid_) {
  114. Next();
  115. }
  116. if (Finished_) {
  117. return;
  118. }
  119. Valid_ = true;
  120. }
  121. TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const
  122. {
  123. return Input_.GetReadByteCount();
  124. }
  125. bool TSkiffTableReader::IsRawReaderExhausted() const
  126. {
  127. return Finished_;
  128. }
  129. ////////////////////////////////////////////////////////////////////////////////
  130. TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
  131. const NSkiff::TSkiffSchemaPtr& schema)
  132. {
  133. using NSkiff::EWireType;
  134. constexpr auto keySwitchColumnName = "$key_switch";
  135. constexpr auto rangeIndexColumnName = "$range_index";
  136. constexpr auto rowIndexColumnName = "$row_index";
  137. static const THashMap<TString, TSkiffColumnSchema> specialColumns = {
  138. {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}},
  139. {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}},
  140. {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}},
  141. };
  142. Y_ENSURE(schema->GetWireType() == EWireType::Variant16,
  143. "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'");
  144. TVector<TSkiffTableSchema> result;
  145. for (const auto& tableSchema : schema->GetChildren()) {
  146. Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple,
  147. "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
  148. TVector<TSkiffColumnSchema> columns;
  149. for (const auto& columnSchema : tableSchema->GetChildren()) {
  150. auto specialColumnIter = specialColumns.find(columnSchema->GetName());
  151. if (specialColumnIter != specialColumns.end()) {
  152. columns.push_back(specialColumnIter->second);
  153. } else {
  154. auto wireType = columnSchema->GetWireType();
  155. bool required = true;
  156. if (wireType == EWireType::Variant8) {
  157. const auto& children = columnSchema->GetChildren();
  158. Y_ENSURE(
  159. children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing &&
  160. NSkiff::IsSimpleType(children[1]->GetWireType()),
  161. "Expected schema of form 'variant8<nothing, simple-type>', got "
  162. << NSkiff::GetShortDebugString(columnSchema));
  163. wireType = children[1]->GetWireType();
  164. required = false;
  165. }
  166. Y_ENSURE(NSkiff::IsSimpleType(wireType),
  167. "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema));
  168. columns.emplace_back(
  169. EColumnType::Dense,
  170. required,
  171. wireType,
  172. columnSchema->GetName());
  173. }
  174. }
  175. result.push_back({std::move(columns)});
  176. }
  177. return result;
  178. }
  179. void TSkiffTableReader::ReadRow()
  180. {
  181. if (Row_.IsUndefined()) {
  182. Row_ = TNode::CreateMap();
  183. } else {
  184. Row_.AsMap().clear();
  185. }
  186. if (RowIndex_) {
  187. ++*RowIndex_;
  188. }
  189. TableIndex_ = Parser_->ParseVariant16Tag();
  190. Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size());
  191. const auto& tableSchema = Schemas_[TableIndex_];
  192. auto parse = [&](NSkiff::EWireType wireType) -> TNode {
  193. switch (wireType) {
  194. case NSkiff::EWireType::Int64:
  195. return Parser_->ParseInt64();
  196. case NSkiff::EWireType::Uint64:
  197. return Parser_->ParseUint64();
  198. case NSkiff::EWireType::Boolean:
  199. return Parser_->ParseBoolean();
  200. case NSkiff::EWireType::Double:
  201. return Parser_->ParseDouble();
  202. case NSkiff::EWireType::String32:
  203. return Parser_->ParseString32();
  204. case NSkiff::EWireType::Yson32:
  205. return NodeFromYsonString(Parser_->ParseYson32());
  206. case NSkiff::EWireType::Nothing:
  207. return TNode::CreateEntity();
  208. default:
  209. Y_ABORT("Bad column wire type: '%s'", ::ToString(wireType).data());
  210. }
  211. };
  212. for (const auto& columnSchema : tableSchema.Columns) {
  213. if (!columnSchema.Required) {
  214. auto tag = Parser_->ParseVariant8Tag();
  215. if (tag == 0) {
  216. if (columnSchema.Type == EColumnType::Dense) {
  217. Row_[columnSchema.Name] = TNode::CreateEntity();
  218. }
  219. continue;
  220. }
  221. Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType
  222. << ">' expected to be 0 or 1, got " << tag);
  223. }
  224. auto value = parse(columnSchema.WireType);
  225. switch (columnSchema.Type) {
  226. case EColumnType::Dense:
  227. Row_[columnSchema.Name] = std::move(value);
  228. break;
  229. case EColumnType::KeySwitch:
  230. if (value.AsBool()) {
  231. AfterKeySwitch_ = true;
  232. Valid_ = false;
  233. }
  234. break;
  235. case EColumnType::RangeIndex:
  236. RangeIndex_ = value.AsInt64();
  237. break;
  238. case EColumnType::RowIndex:
  239. RowIndex_ = value.AsInt64();
  240. break;
  241. default:
  242. Y_ABORT("Bad column type: %d", static_cast<int>(columnSchema.Type));
  243. }
  244. }
  245. // We successfully parsed one more row from the stream,
  246. // so reset retry count to their initial value.
  247. Input_.ResetRetries();
  248. }
  249. void TSkiffTableReader::EnsureValidity() const
  250. {
  251. Y_ENSURE(Valid_, "Iterator is not valid");
  252. }
  253. } // namespace NDetail
  254. } // namespace NYT