node_table_reader.cpp 8.0 KB


  1. #include "node_table_reader.h"
  2. #include <yt/cpp/mapreduce/common/node_builder.h>
  3. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  4. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  5. #include <library/cpp/yson/parser.h>
  6. namespace NYT {
  7. ////////////////////////////////////////////////////////////////////////////////
  8. class TRowBuilder
  9. : public ::NYson::TYsonConsumerBase
  10. {
  11. public:
  12. explicit TRowBuilder(TMaybe<TRowElement>* resultRow);
  13. void OnStringScalar(TStringBuf value) override;
  14. void OnInt64Scalar(i64 value) override;
  15. void OnUint64Scalar(ui64 value) override;
  16. void OnDoubleScalar(double value) override;
  17. void OnBooleanScalar(bool value) override;
  18. void OnBeginList() override;
  19. void OnEntity() override;
  20. void OnListItem() override;
  21. void OnEndList() override;
  22. void OnBeginMap() override;
  23. void OnKeyedItem(TStringBuf key) override;
  24. void OnEndMap() override;
  25. void OnBeginAttributes() override;
  26. void OnEndAttributes() override;
  27. void Finalize();
  28. private:
  29. THolder<TNodeBuilder> Builder_;
  30. TRowElement Row_;
  31. int Depth_ = 0;
  32. bool Started_ = false;
  33. TMaybe<TRowElement>* ResultRow_;
  34. void SaveResultRow();
  35. };
  36. TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow)
  37. : ResultRow_(resultRow)
  38. { }
  39. void TRowBuilder::OnStringScalar(TStringBuf value)
  40. {
  41. Row_.Size += sizeof(TNode) + sizeof(TString) + value.size();
  42. Builder_->OnStringScalar(value);
  43. }
  44. void TRowBuilder::OnInt64Scalar(i64 value)
  45. {
  46. Row_.Size += sizeof(TNode);
  47. Builder_->OnInt64Scalar(value);
  48. }
  49. void TRowBuilder::OnUint64Scalar(ui64 value)
  50. {
  51. Row_.Size += sizeof(TNode);
  52. Builder_->OnUint64Scalar(value);
  53. }
  54. void TRowBuilder::OnDoubleScalar(double value)
  55. {
  56. Row_.Size += sizeof(TNode);
  57. Builder_->OnDoubleScalar(value);
  58. }
  59. void TRowBuilder::OnBooleanScalar(bool value)
  60. {
  61. Row_.Size += sizeof(TNode);
  62. Builder_->OnBooleanScalar(value);
  63. }
  64. void TRowBuilder::OnBeginList()
  65. {
  66. ++Depth_;
  67. Builder_->OnBeginList();
  68. }
  69. void TRowBuilder::OnEntity()
  70. {
  71. Row_.Size += sizeof(TNode);
  72. Builder_->OnEntity();
  73. }
  74. void TRowBuilder::OnListItem()
  75. {
  76. if (Depth_ == 0) {
  77. SaveResultRow();
  78. } else {
  79. Builder_->OnListItem();
  80. }
  81. }
  82. void TRowBuilder::OnEndList()
  83. {
  84. --Depth_;
  85. Builder_->OnEndList();
  86. }
  87. void TRowBuilder::OnBeginMap()
  88. {
  89. ++Depth_;
  90. Builder_->OnBeginMap();
  91. }
  92. void TRowBuilder::OnKeyedItem(TStringBuf key)
  93. {
  94. Row_.Size += sizeof(TString) + key.size();
  95. Builder_->OnKeyedItem(key);
  96. }
  97. void TRowBuilder::OnEndMap()
  98. {
  99. --Depth_;
  100. Builder_->OnEndMap();
  101. }
  102. void TRowBuilder::OnBeginAttributes()
  103. {
  104. ++Depth_;
  105. Builder_->OnBeginAttributes();
  106. }
  107. void TRowBuilder::OnEndAttributes()
  108. {
  109. --Depth_;
  110. Builder_->OnEndAttributes();
  111. }
  112. void TRowBuilder::SaveResultRow()
  113. {
  114. if (!Started_) {
  115. Started_ = true;
  116. } else {
  117. *ResultRow_ = std::move(Row_);
  118. }
  119. Row_.Reset();
  120. Builder_.Reset(new TNodeBuilder(&Row_.Node));
  121. }
  122. void TRowBuilder::Finalize()
  123. {
  124. if (Started_) {
  125. *ResultRow_ = std::move(Row_);
  126. }
  127. }
  128. ////////////////////////////////////////////////////////////////////////////////
  129. TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input)
  130. : Input_(std::move(input))
  131. {
  132. PrepareParsing();
  133. Next();
  134. }
  135. TNodeTableReader::~TNodeTableReader()
  136. {
  137. }
  138. void TNodeTableReader::ParseListFragmentItem() {
  139. if (!Parser_->Parse()) {
  140. Builder_->Finalize();
  141. IsLast_ = true;
  142. }
  143. }
  144. const TNode& TNodeTableReader::GetRow() const
  145. {
  146. CheckValidity();
  147. if (!Row_) {
  148. ythrow yexception() << "Row is moved";
  149. }
  150. return Row_->Node;
  151. }
  152. void TNodeTableReader::MoveRow(TNode* result)
  153. {
  154. CheckValidity();
  155. if (!Row_) {
  156. ythrow yexception() << "Row is moved";
  157. }
  158. *result = std::move(Row_->Node);
  159. Row_.Clear();
  160. }
  161. bool TNodeTableReader::IsValid() const
  162. {
  163. return Valid_;
  164. }
  165. void TNodeTableReader::Next()
  166. {
  167. try {
  168. NextImpl();
  169. } catch (const std::exception& ex) {
  170. YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what());
  171. throw;
  172. }
  173. }
  174. void TNodeTableReader::NextImpl()
  175. {
  176. CheckValidity();
  177. if (RowIndex_) {
  178. ++*RowIndex_;
  179. }
  180. // At the begin of stream parser doesn't return a finished row.
  181. ParseFirstListFragmentItem();
  182. while (true) {
  183. if (IsLast_) {
  184. Finished_ = true;
  185. Valid_ = false;
  186. break;
  187. }
  188. try {
  189. ParseListFragmentItem();
  190. } catch (std::exception& ex) {
  191. NeedParseFirst_ = true;
  192. OnStreamError(std::current_exception(), ex.what());
  193. ParseFirstListFragmentItem();
  194. continue;
  195. }
  196. Row_ = std::move(*NextRow_);
  197. if (!Row_) {
  198. throw yexception() << "No row in NextRow_";
  199. }
  200. // We successfully parsed one more row from the stream,
  201. // so reset retry count to their initial value.
  202. Input_.ResetRetries();
  203. if (!Row_->Node.IsNull()) {
  204. AtStart_ = false;
  205. break;
  206. }
  207. for (auto& entry : Row_->Node.GetAttributes().AsMap()) {
  208. if (entry.first == "key_switch") {
  209. if (!AtStart_) {
  210. Valid_ = false;
  211. }
  212. } else if (entry.first == "table_index") {
  213. TableIndex_ = static_cast<ui32>(entry.second.AsInt64());
  214. } else if (entry.first == "row_index") {
  215. RowIndex_ = static_cast<ui64>(entry.second.AsInt64());
  216. } else if (entry.first == "range_index") {
  217. RangeIndex_ = static_cast<ui32>(entry.second.AsInt64());
  218. } else if (entry.first == "tablet_index") {
  219. TabletIndex_ = entry.second.AsInt64();
  220. } else if (entry.first == "end_of_stream") {
  221. IsEndOfStream_ = true;
  222. }
  223. }
  224. if (!Valid_) {
  225. break;
  226. }
  227. }
  228. }
  229. void TNodeTableReader::ParseFirstListFragmentItem()
  230. {
  231. while (NeedParseFirst_) {
  232. try {
  233. ParseListFragmentItem();
  234. NeedParseFirst_ = false;
  235. break;
  236. } catch (std::exception& ex) {
  237. OnStreamError(std::current_exception(), ex.what());
  238. }
  239. }
  240. }
  241. ui32 TNodeTableReader::GetTableIndex() const
  242. {
  243. CheckValidity();
  244. return TableIndex_;
  245. }
  246. ui32 TNodeTableReader::GetRangeIndex() const
  247. {
  248. CheckValidity();
  249. return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
  250. }
  251. ui64 TNodeTableReader::GetRowIndex() const
  252. {
  253. CheckValidity();
  254. return RowIndex_.GetOrElse(0UL);
  255. }
  256. i64 TNodeTableReader::GetTabletIndex() const
  257. {
  258. CheckValidity();
  259. return TabletIndex_.GetOrElse(0L);
  260. }
  261. void TNodeTableReader::NextKey()
  262. {
  263. while (Valid_) {
  264. Next();
  265. }
  266. if (Finished_) {
  267. return;
  268. }
  269. Valid_ = true;
  270. if (RowIndex_) {
  271. --*RowIndex_;
  272. }
  273. }
  274. TMaybe<size_t> TNodeTableReader::GetReadByteCount() const
  275. {
  276. return Input_.GetReadByteCount();
  277. }
  278. bool TNodeTableReader::IsEndOfStream() const
  279. {
  280. return IsEndOfStream_;
  281. }
  282. bool TNodeTableReader::IsRawReaderExhausted() const
  283. {
  284. return Finished_;
  285. }
  286. ////////////////////////////////////////////////////////////////////////////////
  287. void TNodeTableReader::PrepareParsing()
  288. {
  289. NextRow_.Clear();
  290. Builder_.Reset(new TRowBuilder(&NextRow_));
  291. Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_));
  292. }
  293. void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error)
  294. {
  295. YT_LOG_ERROR("Read error (RangeIndex: %v, RowIndex: %v, Error: %v)", RangeIndex_, RowIndex_, error);
  296. Exception_ = exception;
  297. if (Input_.Retry(RangeIndex_, RowIndex_, exception)) {
  298. if (RangeIndex_) {
  299. RangeIndexShift_ += *RangeIndex_;
  300. }
  301. RowIndex_.Clear();
  302. RangeIndex_.Clear();
  303. PrepareParsing();
  304. } else {
  305. std::rethrow_exception(Exception_);
  306. }
  307. }
  308. void TNodeTableReader::CheckValidity() const
  309. {
  310. if (!Valid_) {
  311. ythrow yexception() << "Iterator is not valid";
  312. }
  313. }
  314. ////////////////////////////////////////////////////////////////////////////////
  315. } // namespace NYT