args_dechunker.cpp 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. #include "args_dechunker.h"
  2. #include <util/generic/yexception.h>
  3. #include <util/generic/ylimits.h>
  4. namespace NYql {
  5. namespace NUdf {
  6. TArgsDechunker::TArgsDechunker(std::vector<arrow::Datum>&& args)
  7. : Args(std::move(args))
  8. , Arrays(Args.size())
  9. {
  10. for (size_t i = 0; i < Args.size(); ++i) {
  11. if (Args[i].is_arraylike()) {
  12. ForEachArrayData(Args[i], [&](const auto& data) {
  13. Arrays[i].push_back(data);
  14. });
  15. }
  16. }
  17. }
  18. bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk) {
  19. ui64 chunkLen;
  20. return Next(chunk, chunkLen);
  21. }
  22. bool TArgsDechunker::Next(std::vector<arrow::Datum>& chunk, ui64& chunkLen) {
  23. chunkLen = 0;
  24. if (Finish) {
  25. return false;
  26. }
  27. size_t minSize = Max<size_t>();
  28. bool haveData = false;
  29. chunk.resize(Args.size());
  30. for (size_t i = 0; i < Args.size(); ++i) {
  31. if (Args[i].is_scalar()) {
  32. chunk[i] = Args[i];
  33. continue;
  34. }
  35. while (!Arrays[i].empty() && Arrays[i].front()->length == 0) {
  36. Arrays[i].pop_front();
  37. }
  38. if (!Arrays[i].empty()) {
  39. haveData = true;
  40. minSize = std::min<size_t>(minSize, Arrays[i].front()->length);
  41. } else {
  42. minSize = 0;
  43. }
  44. }
  45. Y_ENSURE(!haveData || minSize > 0, "Block length mismatch");
  46. if (!haveData) {
  47. Finish = true;
  48. return false;
  49. }
  50. for (size_t i = 0; i < Args.size(); ++i) {
  51. if (!Args[i].is_scalar()) {
  52. Y_ENSURE(!Arrays[i].empty(), "Block length mismatch");
  53. chunk[i] = arrow::Datum(Chop(Arrays[i].front(), minSize));
  54. }
  55. }
  56. chunkLen = minSize;
  57. return true;
  58. }
  59. }
  60. }