skiff_table_reader.cpp 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #include "skiff_table_reader.h"
  2. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  3. #include <library/cpp/yson/node/node_io.h>
  4. #include <yt/cpp/mapreduce/skiff/wire_type.h>
  5. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  6. #include <util/string/cast.h>
  7. namespace NYT {
  8. namespace NDetail {
  9. namespace {
  10. ////////////////////////////////////////////////////////////////////////////////
  11. enum EColumnType : i8
  12. {
  13. Dense,
  14. KeySwitch,
  15. RangeIndex,
  16. RowIndex
  17. };
  18. struct TSkiffColumnSchema
  19. {
  20. EColumnType Type;
  21. bool Required;
  22. NSkiff::EWireType WireType;
  23. TString Name;
  24. TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name)
  25. : Type(type)
  26. , Required(required)
  27. , WireType(wireType)
  28. , Name(name)
  29. { }
  30. };
  31. ////////////////////////////////////////////////////////////////////////////////
  32. } // namespace
  33. struct TSkiffTableReader::TSkiffTableSchema
  34. {
  35. TVector<TSkiffColumnSchema> Columns;
  36. };
  37. TSkiffTableReader::TSkiffTableReader(
  38. ::TIntrusivePtr<TRawTableReader> input,
  39. const NSkiff::TSkiffSchemaPtr& schema)
  40. : Input_(std::move(input))
  41. , BufferedInput_(&Input_)
  42. , Parser_(&BufferedInput_)
  43. , Schemas_(CreateSkiffTableSchemas(schema))
  44. {
  45. Next();
  46. }
  47. TSkiffTableReader::~TSkiffTableReader() = default;
  48. const TNode& TSkiffTableReader::GetRow() const
  49. {
  50. EnsureValidity();
  51. Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
  52. return Row_;
  53. }
  54. void TSkiffTableReader::MoveRow(TNode* result)
  55. {
  56. EnsureValidity();
  57. Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
  58. *result = std::move(Row_);
  59. Row_ = TNode();
  60. }
  61. bool TSkiffTableReader::IsValid() const
  62. {
  63. return Valid_;
  64. }
  65. void TSkiffTableReader::Next()
  66. {
  67. EnsureValidity();
  68. if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
  69. Finished_ = true;
  70. Valid_ = false;
  71. return;
  72. }
  73. if (AfterKeySwitch_) {
  74. AfterKeySwitch_ = false;
  75. return;
  76. }
  77. while (true) {
  78. try {
  79. ReadRow();
  80. break;
  81. } catch (const std::exception& exception) {
  82. YT_LOG_ERROR("Read error: %v", exception.what());
  83. if (!Input_.Retry(RangeIndex_, RowIndex_)) {
  84. throw;
  85. }
  86. BufferedInput_ = TBufferedInput(&Input_);
  87. Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_));
  88. if (RangeIndex_) {
  89. RangeIndexShift_ += *RangeIndex_;
  90. }
  91. RangeIndex_.Clear();
  92. RowIndex_.Clear();
  93. }
  94. }
  95. }
  96. ui32 TSkiffTableReader::GetTableIndex() const
  97. {
  98. EnsureValidity();
  99. return TableIndex_;
  100. }
  101. ui32 TSkiffTableReader::GetRangeIndex() const
  102. {
  103. EnsureValidity();
  104. return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
  105. }
  106. ui64 TSkiffTableReader::GetRowIndex() const
  107. {
  108. EnsureValidity();
  109. return RowIndex_.GetOrElse(0ULL);
  110. }
  111. void TSkiffTableReader::NextKey()
  112. {
  113. while (Valid_) {
  114. Next();
  115. }
  116. if (Finished_) {
  117. return;
  118. }
  119. Valid_ = true;
  120. }
  121. TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const
  122. {
  123. return Input_.GetReadByteCount();
  124. }
  125. bool TSkiffTableReader::IsRawReaderExhausted() const
  126. {
  127. return Finished_;
  128. }
  129. ////////////////////////////////////////////////////////////////////////////////
  130. TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
  131. const NSkiff::TSkiffSchemaPtr& schema)
  132. {
  133. using NSkiff::EWireType;
  134. constexpr auto keySwitchColumnName = "$key_switch";
  135. constexpr auto rangeIndexColumnName = "$range_index";
  136. constexpr auto rowIndexColumnName = "$row_index";
  137. static const THashMap<TString, TSkiffColumnSchema> specialColumns = {
  138. {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}},
  139. {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}},
  140. {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}},
  141. };
  142. Y_ENSURE(schema->GetWireType() == EWireType::Variant16,
  143. "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'");
  144. TVector<TSkiffTableSchema> result;
  145. for (const auto& tableSchema : schema->GetChildren()) {
  146. Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple,
  147. "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
  148. TVector<TSkiffColumnSchema> columns;
  149. for (const auto& columnSchema : tableSchema->GetChildren()) {
  150. if (columnSchema->GetName().StartsWith("$")) {
  151. auto iter = specialColumns.find(columnSchema->GetName());
  152. Y_ENSURE(iter != specialColumns.end(), "Unknown special column: " << columnSchema->GetName());
  153. columns.push_back(iter->second);
  154. } else {
  155. auto wireType = columnSchema->GetWireType();
  156. bool required = true;
  157. if (wireType == EWireType::Variant8) {
  158. const auto& children = columnSchema->GetChildren();
  159. Y_ENSURE(
  160. children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing &&
  161. NSkiff::IsSimpleType(children[1]->GetWireType()),
  162. "Expected schema of form 'variant8<nothing, simple-type>', got "
  163. << NSkiff::GetShortDebugString(columnSchema));
  164. wireType = children[1]->GetWireType();
  165. required = false;
  166. }
  167. Y_ENSURE(NSkiff::IsSimpleType(wireType),
  168. "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema));
  169. columns.emplace_back(
  170. EColumnType::Dense,
  171. required,
  172. wireType,
  173. columnSchema->GetName());
  174. }
  175. }
  176. result.push_back({std::move(columns)});
  177. }
  178. return result;
  179. }
  180. void TSkiffTableReader::ReadRow()
  181. {
  182. if (Row_.IsUndefined()) {
  183. Row_ = TNode::CreateMap();
  184. } else {
  185. Row_.AsMap().clear();
  186. }
  187. if (RowIndex_) {
  188. ++*RowIndex_;
  189. }
  190. TableIndex_ = Parser_->ParseVariant16Tag();
  191. Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size());
  192. const auto& tableSchema = Schemas_[TableIndex_];
  193. auto parse = [&](NSkiff::EWireType wireType) -> TNode {
  194. switch (wireType) {
  195. case NSkiff::EWireType::Int64:
  196. return Parser_->ParseInt64();
  197. case NSkiff::EWireType::Uint64:
  198. return Parser_->ParseUint64();
  199. case NSkiff::EWireType::Boolean:
  200. return Parser_->ParseBoolean();
  201. case NSkiff::EWireType::Double:
  202. return Parser_->ParseDouble();
  203. case NSkiff::EWireType::String32:
  204. return Parser_->ParseString32();
  205. case NSkiff::EWireType::Yson32:
  206. return NodeFromYsonString(Parser_->ParseYson32());
  207. case NSkiff::EWireType::Nothing:
  208. return TNode::CreateEntity();
  209. default:
  210. Y_ABORT("Bad column wire type: '%s'", ::ToString(wireType).data());
  211. }
  212. };
  213. for (const auto& columnSchema : tableSchema.Columns) {
  214. if (!columnSchema.Required) {
  215. auto tag = Parser_->ParseVariant8Tag();
  216. if (tag == 0) {
  217. if (columnSchema.Type == EColumnType::Dense) {
  218. Row_[columnSchema.Name] = TNode::CreateEntity();
  219. }
  220. continue;
  221. }
  222. Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType
  223. << ">' expected to be 0 or 1, got " << tag);
  224. }
  225. auto value = parse(columnSchema.WireType);
  226. switch (columnSchema.Type) {
  227. case EColumnType::Dense:
  228. Row_[columnSchema.Name] = std::move(value);
  229. break;
  230. case EColumnType::KeySwitch:
  231. if (value.AsBool()) {
  232. AfterKeySwitch_ = true;
  233. Valid_ = false;
  234. }
  235. break;
  236. case EColumnType::RangeIndex:
  237. RangeIndex_ = value.AsInt64();
  238. break;
  239. case EColumnType::RowIndex:
  240. RowIndex_ = value.AsInt64();
  241. break;
  242. default:
  243. Y_ABORT("Bad column type: %d", static_cast<int>(columnSchema.Type));
  244. }
  245. }
  246. // We successfully parsed one more row from the stream,
  247. // so reset retry count to their initial value.
  248. Input_.ResetRetries();
  249. }
  250. void TSkiffTableReader::EnsureValidity() const
  251. {
  252. Y_ENSURE(Valid_, "Iterator is not valid");
  253. }
  254. } // namespace NDetail
  255. } // namespace NYT