yql_codec_buf.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. #pragma once
  2. #include <yql/essentials/minikql/mkql_stats_registry.h>
  3. #include <yql/essentials/utils/yql_panic.h>
  4. #include <library/cpp/yson/zigzag.h>
  5. #include <util/generic/yexception.h>
  6. #include <util/generic/maybe.h>
  7. #ifndef LLVM_BC
  8. #include <util/datetime/base.h>
  9. #else
  10. using TInstant = ui64;
  11. #endif
  12. #include <util/generic/vector.h>
  13. #include <utility>
  14. #include <functional>
  15. #include <optional>
  16. namespace NYql {
  17. namespace NCommon {
  18. class TTimeoutException : public yexception {
  19. };
  20. struct IBlockReader {
  21. virtual ~IBlockReader() = default;
  22. virtual void SetDeadline(TInstant deadline) = 0;
  23. virtual std::pair<const char*, const char*> NextFilledBlock() = 0;
  24. virtual void ReturnBlock() = 0;
  25. virtual bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex, const std::exception_ptr& error) = 0;
  26. };
  27. struct IBlockWriter {
  28. virtual ~IBlockWriter() = default;
  29. virtual void SetRecordBoundaryCallback(std::function<void()> callback) = 0;
  30. virtual std::pair<char*, char*> NextEmptyBlock() = 0;
  31. virtual void ReturnBlock(size_t avail, std::optional<size_t> lastRecordBoundary) = 0;
  32. virtual void Finish() = 0;
  33. };
  34. //////////////////////////////////////////////////////////////////////////////////////////////////////////
  35. extern NKikimr::NMiniKQL::TStatKey InputBytes;
  36. class TInputBuf;
  37. extern "C" char InputBufReadSlowThunk(TInputBuf& in);
  38. extern "C" void InputBufReadManySlowThunk(TInputBuf& in, char* buffer, size_t count);
  39. extern "C" void InputBufSkipManySlowThunk(TInputBuf& in, size_t count);
  40. class TInputBuf {
  41. friend char InputBufReadSlowThunk(TInputBuf& in);
  42. friend void InputBufReadManySlowThunk(TInputBuf& in, char* buffer, size_t count);
  43. friend void InputBufSkipManySlowThunk(TInputBuf& in, size_t count);
  44. public:
  45. TInputBuf(NKikimr::NMiniKQL::TSamplingStatTimer* readTimer)
  46. : ReadTimer_(readTimer)
  47. {}
  48. TInputBuf(IBlockReader& source, NKikimr::NMiniKQL::TSamplingStatTimer* readTimer)
  49. : TInputBuf(readTimer)
  50. {
  51. SetSource(source);
  52. }
  53. void SetSource(IBlockReader& source) {
  54. Source_ = &source;
  55. Current_ = nullptr;
  56. End_ = nullptr;
  57. String_.clear();
  58. }
  59. void SetStats(NKikimr::NMiniKQL::IStatsRegistry* jobStats) {
  60. JobStats_ = jobStats;
  61. }
  62. void SetNextBlockCallback(std::function<void()> cb) {
  63. OnNextBlockCallback_ = std::move(cb);
  64. }
  65. bool TryRead(char& value) {
  66. if (Y_LIKELY(Current_ < End_)) {
  67. value = *Current_++;
  68. return true;
  69. }
  70. return TryReadSlow(value);
  71. }
  72. char Read() {
  73. if (Y_LIKELY(Current_ < End_)) {
  74. return *Current_++;
  75. }
  76. return InputBufReadSlowThunk(*this);
  77. }
  78. ui32 ReadVarUI32() {
  79. char cmd = Read();
  80. if (Y_LIKELY(!(cmd & 0x80))) {
  81. return cmd;
  82. }
  83. return ReadVarUI32Slow(cmd);
  84. }
  85. ui32 CopyVarUI32(TVector<char>& yson);
  86. ui64 CopyVarUI64(TVector<char>& yson);
  87. ui32 ReadVarUI32Slow(char cmd);
  88. ui64 ReadVarUI64() {
  89. char cmd = Read();
  90. if (Y_LIKELY(!(cmd & 0x80))) {
  91. return cmd;
  92. }
  93. return ReadVarUI64Slow(cmd);
  94. }
  95. ui64 ReadVarUI64Slow(char cmd);
  96. i64 ReadVarI64() {
  97. return NYson::ZigZagDecode64(ReadVarUI64());
  98. }
  99. i32 ReadVarI32() {
  100. return NYson::ZigZagDecode32(ReadVarUI32());
  101. }
  102. i64 CopyVarI64(TVector<char>& yson) {
  103. return NYson::ZigZagDecode64(CopyVarUI64(yson));
  104. }
  105. i32 CopyVarI32(TVector<char>& yson) {
  106. return NYson::ZigZagDecode32(CopyVarUI32(yson));
  107. }
  108. TStringBuf ReadYtString(ui32 lookAhead = 0);
  109. TVector<char>& YsonBuffer() {
  110. return String_;
  111. }
  112. void ReadMany(char* buffer, size_t count) {
  113. if (Y_LIKELY(Current_ + count <= End_)) {
  114. memcpy(buffer, Current_, count);
  115. Current_ += count;
  116. return;
  117. }
  118. return InputBufReadManySlowThunk(*this, buffer, count);
  119. }
  120. void SkipMany(size_t count) {
  121. if (Y_LIKELY(Current_ + count <= End_)) {
  122. Current_ += count;
  123. return;
  124. }
  125. return InputBufSkipManySlowThunk(*this, count);
  126. }
  127. void CopyMany(size_t count, TVector<char>& yson) {
  128. auto origSize = yson.size();
  129. yson.resize(origSize + count);
  130. ReadMany(yson.data() + origSize, count);
  131. }
  132. // Call it on error
  133. void Reset();
  134. private:
  135. void Fill();
  136. bool TryReadSlow(char& value);
  137. char ReadSlow();
  138. void ReadManySlow(char* buffer, size_t count);
  139. void SkipManySlow(size_t count);
  140. private:
  141. IBlockReader* Source_ = nullptr;
  142. NKikimr::NMiniKQL::TSamplingStatTimer* ReadTimer_;
  143. NKikimr::NMiniKQL::IStatsRegistry* JobStats_ = nullptr;
  144. const char* Current_ = nullptr;
  145. const char* End_ = nullptr;
  146. TVector<char> String_;
  147. std::function<void()> OnNextBlockCallback_;
  148. };
  149. //////////////////////////////////////////////////////////////////////////////////////////////////////////
  150. extern NKikimr::NMiniKQL::TStatKey OutputBytes;
  151. class TOutputBuf;
  152. extern "C" void OutputBufFlushThunk(TOutputBuf& out);
  153. extern "C" void OutputBufWriteManySlowThunk(TOutputBuf& out, const char* buffer, size_t count);
  154. class TOutputBuf {
  155. friend void OutputBufWriteManySlowThunk(TOutputBuf& out, const char* buffer, size_t count);
  156. public:
  157. TOutputBuf(IBlockWriter& target, NKikimr::NMiniKQL::TStatTimer* writeTimer);
  158. ui64 GetWrittenBytes() const {
  159. return WrittenBytes_;
  160. }
  161. void SetStats(NKikimr::NMiniKQL::IStatsRegistry* jobStats) {
  162. JobStats_ = jobStats;
  163. }
  164. void Write(char value) {
  165. if (Y_LIKELY(Current_ < End_)) {
  166. *Current_++ = value;
  167. return;
  168. }
  169. WriteSlow(value);
  170. }
  171. void WriteMany(TStringBuf str) {
  172. WriteMany(str.data(), str.size());
  173. }
  174. void WriteMany(const char* buffer, size_t count) {
  175. if (Y_LIKELY(Current_ + count <= End_)) {
  176. memcpy(Current_, buffer, count);
  177. Current_ += count;
  178. return;
  179. }
  180. OutputBufWriteManySlowThunk(*this, buffer, count);
  181. }
  182. void WriteVarI32(i32 value) {
  183. WriteVarUI32(NYson::ZigZagEncode32(value));
  184. }
  185. void WriteVarI64(i64 value) {
  186. WriteVarUI64(NYson::ZigZagEncode64(value));
  187. }
  188. void WriteVarUI32(ui32 value);
  189. void WriteVarUI64(ui64 value);
  190. void Flush();
  191. void Finish() {
  192. Flush();
  193. if (WriteTimer_) {
  194. WriteTimer_->Acquire();
  195. }
  196. Target_.Finish();
  197. if (WriteTimer_) {
  198. WriteTimer_->Release();
  199. }
  200. }
  201. void OnRecordBoundary() {
  202. RecordBoundary_ = Current_;
  203. }
  204. private:
  205. void WriteSlow(char value) {
  206. OutputBufFlushThunk(*this);
  207. *Current_++ = value;
  208. }
  209. void WriteManySlow(const char* buffer, size_t count);
  210. private:
  211. IBlockWriter& Target_;
  212. NKikimr::NMiniKQL::TStatTimer* WriteTimer_;
  213. NKikimr::NMiniKQL::IStatsRegistry* JobStats_ = nullptr;
  214. char* Begin_ = nullptr;
  215. char* Current_ = nullptr;
  216. char* End_ = nullptr;
  217. char* RecordBoundary_ = nullptr;
  218. ui64 WrittenBytes_ = 0;
  219. };
  220. #define CHECK_EXPECTED(read, expected) \
  221. YQL_ENSURE(read == expected, "Expected char: " << TString(1, expected).Quote() << ", but read: " << TString(1, read).Quote());
  222. #define EXPECTED(buf, expected) \
  223. { char read = buf.Read(); CHECK_EXPECTED(read, expected); }
  224. #define EXPECTED_COPY(buf, expected, yson) \
  225. { char read = buf.Read(); CHECK_EXPECTED(read, expected); yson.push_back(read); }
  226. #define CHECK_STRING_LENGTH(length) \
  227. YQL_ENSURE(length >= 0 && length < (1 << 30), "Bad string length: " << length);
  228. #define CHECK_STRING_LENGTH_UNSIGNED(length) \
  229. YQL_ENSURE(length < (1 << 30), "Bad string length: " << length);
  230. } // NCommon
  231. } // NYql