yql_codec_buf.cpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. #include "yql_codec_buf.h"
  2. namespace NYql {
  3. namespace NCommon {
  4. NKikimr::NMiniKQL::TStatKey InputBytes("Job_InputBytes", true);
  5. NKikimr::NMiniKQL::TStatKey OutputBytes("Job_OutputBytes", true);
  6. ui32 TInputBuf::CopyVarUI32(TVector<char>& yson) {
  7. char cmd = Read();
  8. yson.push_back(cmd);
  9. ui32 shift = 0;
  10. ui32 value = cmd & 0x7f;
  11. while (cmd & 0x80) {
  12. shift += 7;
  13. cmd = Read();
  14. yson.push_back(cmd);
  15. value |= ui32(cmd & 0x7f) << shift;
  16. }
  17. return value;
  18. }
  19. ui64 TInputBuf::CopyVarUI64(TVector<char>& yson) {
  20. char cmd = Read();
  21. yson.push_back(cmd);
  22. ui64 shift = 0;
  23. ui64 value = cmd & 0x7f;
  24. while (cmd & 0x80) {
  25. shift += 7;
  26. cmd = Read();
  27. yson.push_back(cmd);
  28. value |= ui64(cmd & 0x7f) << shift;
  29. }
  30. return value;
  31. }
  32. ui32 TInputBuf::ReadVarUI32Slow(char cmd) {
  33. ui32 shift = 0;
  34. ui32 value = cmd & 0x7f;
  35. for (;;) {
  36. shift += 7;
  37. cmd = Read();
  38. value |= ui32(cmd & 0x7f) << shift;
  39. if (!(cmd & 0x80)) {
  40. break;
  41. }
  42. }
  43. return value;
  44. }
  45. ui64 TInputBuf::ReadVarUI64Slow(char cmd) {
  46. ui64 shift = 0;
  47. ui64 value = cmd & 0x7f;
  48. for (;;) {
  49. shift += 7;
  50. cmd = Read();
  51. value |= ui64(cmd & 0x7f) << shift;
  52. if (!(cmd & 0x80)) {
  53. break;
  54. }
  55. }
  56. return value;
  57. }
  58. TStringBuf TInputBuf::ReadYtString(ui32 lookAhead) {
  59. i32 length = ReadVarI32();
  60. CHECK_STRING_LENGTH(length);
  61. if (Current_ + length + lookAhead <= End_) {
  62. TStringBuf ret(Current_, length);
  63. Current_ += length;
  64. return ret;
  65. }
  66. String_.resize(length);
  67. ReadMany(String_.data(), String_.size());
  68. return TStringBuf(String_.data(), String_.size());
  69. }
  70. void TInputBuf::Reset() {
  71. if (End_) {
  72. Source_->ReturnBlock();
  73. }
  74. Current_ = End_ = nullptr;
  75. }
  76. void TInputBuf::Fill() {
  77. if (Current_ < End_) {
  78. return;
  79. }
  80. if (!Source_) {
  81. return;
  82. }
  83. bool blockSwitch = false;
  84. if (End_) {
  85. Source_->ReturnBlock();
  86. blockSwitch = true;
  87. }
  88. if (ReadTimer_) {
  89. ReadTimer_->Acquire();
  90. }
  91. std::tie(Current_, End_) = Source_->NextFilledBlock();
  92. blockSwitch = blockSwitch && Current_ != End_;
  93. MKQL_ADD_STAT(JobStats_, InputBytes, End_ - Current_);
  94. if (ReadTimer_) {
  95. ReadTimer_->Release();
  96. }
  97. if (blockSwitch && OnNextBlockCallback_) {
  98. OnNextBlockCallback_();
  99. }
  100. }
  101. bool TInputBuf::TryReadSlow(char& value) {
  102. End_ = Current_;
  103. Fill();
  104. if (End_ == Current_) {
  105. return false;
  106. }
  107. value = *Current_++;
  108. return true;
  109. }
  110. extern "C" char InputBufReadSlowThunk(TInputBuf& in) {
  111. return in.ReadSlow();
  112. }
  113. extern "C" void InputBufReadManySlowThunk(TInputBuf& in, char* buffer, size_t count) {
  114. return in.ReadManySlow(buffer, count);
  115. }
  116. extern "C" void InputBufSkipManySlowThunk(TInputBuf& in, size_t count) {
  117. return in.SkipManySlow(count);
  118. }
  119. char TInputBuf::ReadSlow() {
  120. End_ = Current_;
  121. Fill();
  122. if (Y_UNLIKELY(End_ <= Current_)) {
  123. ythrow yexception() << "Unexpected end of stream";
  124. }
  125. return *Current_++;
  126. }
  127. void TInputBuf::ReadManySlow(char* buffer, size_t count) {
  128. while (count > 0) {
  129. Fill();
  130. if (Y_UNLIKELY(End_ <= Current_)) {
  131. ythrow yexception() << "Unexpected end of stream";
  132. }
  133. size_t toCopy = Min<size_t>(count, End_ - Current_);
  134. memcpy(buffer, Current_, toCopy);
  135. count -= toCopy;
  136. buffer += toCopy;
  137. Current_ += toCopy;
  138. }
  139. }
  140. void TInputBuf::SkipManySlow(size_t count) {
  141. while (count > 0) {
  142. Fill();
  143. if (Y_UNLIKELY(End_ <= Current_)) {
  144. ythrow yexception() << "Unexpected end of stream";
  145. }
  146. size_t toSkip = Min<size_t>(count, End_ - Current_);
  147. count -= toSkip;
  148. Current_ += toSkip;
  149. }
  150. }
  151. TOutputBuf::TOutputBuf(IBlockWriter& target, NKikimr::NMiniKQL::TStatTimer* writeTimer)
  152. : Target_(target)
  153. , WriteTimer_(writeTimer)
  154. {
  155. std::tie(Begin_, End_) = Target_.NextEmptyBlock();
  156. Current_ = Begin_;
  157. }
  158. void TOutputBuf::WriteVarUI32(ui32 value) {
  159. for (;;) {
  160. char cmd = value & 0x7f;
  161. value >>= 7;
  162. if (value) {
  163. cmd |= 0x80;
  164. Write(cmd);
  165. } else {
  166. Write(cmd);
  167. break;
  168. }
  169. }
  170. }
  171. void TOutputBuf::WriteVarUI64(ui64 value) {
  172. for (;;) {
  173. char cmd = value & 0x7f;
  174. value >>= 7;
  175. if (value) {
  176. cmd |= 0x80;
  177. Write(cmd);
  178. } else {
  179. Write(cmd);
  180. break;
  181. }
  182. }
  183. }
  184. extern "C" void OutputBufFlushThunk(TOutputBuf& out) {
  185. out.Flush();
  186. }
  187. extern "C" void OutputBufWriteManySlowThunk(TOutputBuf& out, const char* buffer, size_t count) {
  188. out.WriteManySlow(buffer, count);
  189. }
  190. void TOutputBuf::Flush() {
  191. if (Current_ > Begin_) {
  192. if (WriteTimer_) {
  193. WriteTimer_->Acquire();
  194. }
  195. const ui64 avail = Current_ - Begin_;
  196. MKQL_ADD_STAT(JobStats_, OutputBytes, avail);
  197. WrittenBytes_ += avail;
  198. Target_.ReturnBlock(avail, RecordBoundary_ ? std::make_optional(RecordBoundary_ - Begin_) : std::nullopt);
  199. std::tie(Begin_, End_) = Target_.NextEmptyBlock();
  200. Current_ = Begin_;
  201. RecordBoundary_ = nullptr;
  202. if (WriteTimer_) {
  203. WriteTimer_->Release();
  204. }
  205. }
  206. }
  207. void TOutputBuf::WriteManySlow(const char* buffer, size_t count) {
  208. // write current buffer
  209. size_t remain = End_ - Current_;
  210. Y_ASSERT(remain < count);
  211. memcpy(Current_, buffer, remain);
  212. Current_ += remain;
  213. Flush();
  214. buffer += remain;
  215. count -= remain;
  216. while (count >= size_t(End_ - Begin_)) {
  217. size_t toWrite = End_ - Begin_;
  218. memcpy(Current_, buffer, toWrite);
  219. Current_ += toWrite;
  220. Flush();
  221. buffer += toWrite;
  222. count -= toWrite;
  223. }
  224. // keep tail
  225. memcpy(Current_, buffer, count);
  226. Current_ += count;
  227. }
  228. }
  229. }