streaming_udf.cpp 28 KB


  1. #include <yql/essentials/public/udf/udf_value.h>
  2. #include <yql/essentials/public/udf/udf_registrator.h>
  3. #include <yql/essentials/public/udf/udf_type_builder.h>
  4. #include <yql/essentials/public/udf/udf_value_builder.h>
  5. #include <yql/essentials/public/udf/udf_terminator.h>
  6. #include <util/generic/buffer.h>
  7. #include <util/generic/mem_copy.h>
  8. #include <util/generic/maybe.h>
  9. #include <util/generic/ptr.h>
  10. #include <util/string/builder.h>
  11. #include <util/stream/mem.h>
  12. #include <library/cpp/deprecated/kmp/kmp.h>
  13. #include <util/string/strip.h>
  14. #include <util/system/condvar.h>
  15. #include <util/system/shellcommand.h>
  16. #include <util/system/tempfile.h>
  17. #include <util/system/sysstat.h>
  18. #include <functional>
  19. using namespace NKikimr;
  20. using namespace NUdf;
  21. namespace {
  22. // Cyclic Read-Write buffer.
  23. // Not thread safe, synchronization between reader and writer threads
  24. // should be managed externally.
  25. class TCyclicRWBuffer {
  26. public:
  27. TCyclicRWBuffer(size_t capacity)
  28. : Buffer(capacity)
  29. , Finished(false)
  30. , DataStart(0)
  31. , DataSize(0)
  32. {
  33. Buffer.Resize(capacity);
  34. }
  35. bool IsFinished() const {
  36. return Finished;
  37. }
  38. void Finish() {
  39. Finished = true;
  40. }
  41. bool HasData() const {
  42. return DataSize > 0;
  43. }
  44. size_t GetDataSize() const {
  45. return DataSize;
  46. }
  47. void GetData(const char*& ptr, size_t& len) const {
  48. size_t readSize = GetDataRegionSize(DataStart, DataSize);
  49. ptr = Buffer.Data() + DataStart;
  50. len = readSize;
  51. }
  52. void CommitRead(size_t len) {
  53. Y_DEBUG_ABORT_UNLESS(len <= GetDataRegionSize(DataStart, DataSize));
  54. DataStart = GetBufferPosition(DataStart + len);
  55. DataSize -= len;
  56. }
  57. bool CanWrite() const {
  58. return WriteSize() > 0;
  59. }
  60. size_t WriteSize() const {
  61. return Buffer.Size() - DataSize;
  62. }
  63. size_t Write(const char*& ptr, size_t& len) {
  64. if (!CanWrite()) {
  65. return 0;
  66. }
  67. size_t bytesWritten = 0;
  68. size_t bytesToWrite = std::min(len, WriteSize());
  69. while (bytesToWrite > 0) {
  70. size_t writeStart = GetWriteStart();
  71. size_t writeSize = GetDataRegionSize(writeStart, bytesToWrite);
  72. MemCopy(Data(writeStart), ptr, writeSize);
  73. DataSize += writeSize;
  74. bytesWritten += writeSize;
  75. bytesToWrite -= writeSize;
  76. ptr += writeSize;
  77. len -= writeSize;
  78. }
  79. return bytesWritten;
  80. }
  81. size_t Write(IZeroCopyInput& input) {
  82. const void* ptr;
  83. size_t dataLen = input.Next(&ptr, WriteSize());
  84. const char* dataPtr = reinterpret_cast<const char*>(ptr);
  85. return Write(dataPtr, dataLen);
  86. }
  87. private:
  88. size_t GetBufferPosition(size_t pos) const {
  89. return pos % Buffer.Size();
  90. }
  91. size_t GetDataRegionSize(size_t start, size_t size) const {
  92. Y_DEBUG_ABORT_UNLESS(start < Buffer.Size());
  93. return std::min(size, Buffer.Size() - start);
  94. }
  95. size_t GetWriteStart() const {
  96. return GetBufferPosition(DataStart + DataSize);
  97. }
  98. char* Data(size_t pos) {
  99. Y_DEBUG_ABORT_UNLESS(pos < Buffer.Size());
  100. return (Buffer.Data() + pos);
  101. }
  102. private:
  103. TBuffer Buffer;
  104. bool Finished;
  105. size_t DataStart;
  106. size_t DataSize;
  107. };
  108. struct TStreamingParams {
  109. public:
  110. const size_t DefaultProcessPollLatencyMs = 5 * 1000; // 5 seconds
  111. const size_t DefaultInputBufferSizeBytes = 4 * 1024 * 1024; // 4MB
  112. const size_t DefaultOutputBufferSizeBytes = 16 * 1024 * 1024; // 16MB
  113. const char* DefaultInputDelimiter = "\n";
  114. const char* DefaultOutputDelimiter = "\n";
  115. public:
  116. TUnboxedValue InputStreamObj;
  117. TString CommandLine;
  118. TUnboxedValue ArgumentsList;
  119. TString InputDelimiter;
  120. TString OutputDelimiter;
  121. size_t InputBufferSizeBytes;
  122. size_t OutputBufferSizeBytes;
  123. size_t ProcessPollLatencyMs;
  124. TStreamingParams()
  125. : InputDelimiter(DefaultInputDelimiter)
  126. , OutputDelimiter(DefaultOutputDelimiter)
  127. , InputBufferSizeBytes(DefaultInputBufferSizeBytes)
  128. , OutputBufferSizeBytes(DefaultOutputBufferSizeBytes)
  129. , ProcessPollLatencyMs(DefaultProcessPollLatencyMs)
  130. {
  131. }
  132. };
  133. struct TThreadSyncData {
  134. TMutex BuffersMutex;
  135. TCondVar InputBufferCanReadCond;
  136. TCondVar MainThreadHasWorkCond;
  137. TCondVar OutputBufferCanWriteCond;
  138. };
  139. class TStringListBufferedInputStream: public IInputStream {
  140. public:
  141. TStringListBufferedInputStream(TUnboxedValue rowsStream, const TString& delimiter, size_t bufferSizeBytes,
  142. TThreadSyncData& syncData, TSourcePosition pos)
  143. : RowsStream(rowsStream)
  144. , Delimiter(delimiter)
  145. , SyncData(syncData)
  146. , Pos_(pos)
  147. , DelimiterMatcher(delimiter)
  148. , DelimiterInput(delimiter)
  149. , Buffer(bufferSizeBytes)
  150. , CurReadMode(ReadMode::Start)
  151. {
  152. }
  153. TStringListBufferedInputStream(const TStringListBufferedInputStream&) = delete;
  154. TStringListBufferedInputStream& operator=(const TStringListBufferedInputStream&) = delete;
  155. TCyclicRWBuffer& GetBuffer() {
  156. return Buffer;
  157. }
  158. // Fetch input from upstream list iterator to the buffer.
  159. // Called from Main thread.
  160. EFetchStatus FetchInput() {
  161. with_lock (SyncData.BuffersMutex) {
  162. Y_DEBUG_ABORT_UNLESS(!Buffer.HasData());
  163. Y_DEBUG_ABORT_UNLESS(Buffer.CanWrite());
  164. bool receivedYield = false;
  165. while (Buffer.CanWrite() && CurReadMode != ReadMode::Done && !receivedYield) {
  166. switch (CurReadMode) {
  167. case ReadMode::Start: {
  168. auto status = ReadNextString();
  169. if (status == EFetchStatus::Yield) {
  170. receivedYield = true;
  171. break;
  172. }
  173. CurReadMode = (status == EFetchStatus::Ok)
  174. ? ReadMode::String
  175. : ReadMode::Done;
  176. break;
  177. }
  178. case ReadMode::String:
  179. if (CurStringInput.Exhausted()) {
  180. DelimiterInput.Reset(Delimiter.data(), Delimiter.size());
  181. CurReadMode = ReadMode::Delimiter;
  182. break;
  183. }
  184. Buffer.Write(CurStringInput);
  185. break;
  186. case ReadMode::Delimiter:
  187. if (DelimiterInput.Exhausted()) {
  188. CurReadMode = ReadMode::Start;
  189. break;
  190. }
  191. Buffer.Write(DelimiterInput);
  192. break;
  193. default:
  194. break;
  195. }
  196. }
  197. if (CurReadMode == ReadMode::Done) {
  198. Buffer.Finish();
  199. }
  200. SyncData.InputBufferCanReadCond.Signal();
  201. return receivedYield ? EFetchStatus::Yield : EFetchStatus::Ok;
  202. }
  203. }
  204. private:
  205. // Read data to pass into the child process input pipe.
  206. // Called from Communicate thread.
  207. size_t DoRead(void* buf, size_t len) override {
  208. try {
  209. with_lock (SyncData.BuffersMutex) {
  210. while (!Buffer.HasData() && !Buffer.IsFinished()) {
  211. SyncData.MainThreadHasWorkCond.Signal();
  212. SyncData.InputBufferCanReadCond.WaitI(SyncData.BuffersMutex);
  213. }
  214. if (!Buffer.HasData()) {
  215. Y_DEBUG_ABORT_UNLESS(Buffer.IsFinished());
  216. return 0;
  217. }
  218. const char* dataPtr;
  219. size_t dataLen;
  220. Buffer.GetData(dataPtr, dataLen);
  221. size_t bytesRead = std::min(dataLen, len);
  222. Y_DEBUG_ABORT_UNLESS(bytesRead > 0);
  223. memcpy(buf, dataPtr, bytesRead);
  224. Buffer.CommitRead(bytesRead);
  225. return bytesRead;
  226. }
  227. ythrow yexception();
  228. } catch (const std::exception& e) {
  229. UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data());
  230. }
  231. }
  232. EFetchStatus ReadNextString() {
  233. TUnboxedValue item;
  234. EFetchStatus status = RowsStream.Fetch(item);
  235. switch (status) {
  236. case EFetchStatus::Yield:
  237. case EFetchStatus::Finish:
  238. return status;
  239. default:
  240. break;
  241. }
  242. CurString = item.GetElement(0);
  243. CurStringInput.Reset(CurString.AsStringRef().Data(), CurString.AsStringRef().Size());
  244. // Check that input string doesn't contain delimiters
  245. const char* match;
  246. Y_UNUSED(match);
  247. if (DelimiterMatcher.SubStr(
  248. CurString.AsStringRef().Data(),
  249. CurString.AsStringRef().Data() + CurString.AsStringRef().Size(),
  250. match))
  251. {
  252. ythrow yexception() << "Delimiter found in input string.";
  253. }
  254. return EFetchStatus::Ok;
  255. }
  256. private:
  257. enum class ReadMode {
  258. Start,
  259. String,
  260. Delimiter,
  261. Done
  262. };
  263. TUnboxedValue RowsStream;
  264. TString Delimiter;
  265. TThreadSyncData& SyncData;
  266. TSourcePosition Pos_;
  267. TKMPMatcher DelimiterMatcher;
  268. TUnboxedValue CurString;
  269. TMemoryInput CurStringInput;
  270. TMemoryInput DelimiterInput;
  271. TCyclicRWBuffer Buffer;
  272. ReadMode CurReadMode;
  273. };
  274. class TStringListBufferedOutputStream: public IOutputStream {
  275. public:
  276. TStringListBufferedOutputStream(const TString& delimiter, size_t stringBufferSizeBytes,
  277. TStringListBufferedInputStream& inputStream, TThreadSyncData& syncData)
  278. : Delimiter(delimiter)
  279. , InputStream(inputStream)
  280. , SyncData(syncData)
  281. , HasDelimiterMatch(false)
  282. , DelimiterMatcherCallback(HasDelimiterMatch)
  283. , DelimiterMatcher(delimiter.data(), delimiter.data() + delimiter.size(), &DelimiterMatcherCallback)
  284. , Buffer(stringBufferSizeBytes)
  285. {
  286. }
  287. TStringListBufferedOutputStream(const TStringListBufferedOutputStream&) = delete;
  288. TStringListBufferedOutputStream& operator=(const TStringListBufferedOutputStream&) = delete;
  289. // Get string record from buffer.
  290. // Called from Main thread.
  291. EFetchStatus FetchNextString(TString& str) {
  292. while (!HasDelimiterMatch) {
  293. with_lock (SyncData.BuffersMutex) {
  294. bool inputHasData;
  295. bool bufferNeedsData;
  296. do {
  297. inputHasData = InputStream.GetBuffer().HasData() || InputStream.GetBuffer().IsFinished();
  298. bufferNeedsData = !Buffer.HasData() && !Buffer.IsFinished();
  299. if (inputHasData && bufferNeedsData) {
  300. SyncData.MainThreadHasWorkCond.WaitI(SyncData.BuffersMutex);
  301. }
  302. } while (inputHasData && bufferNeedsData);
  303. if (!inputHasData) {
  304. auto status = InputStream.FetchInput();
  305. if (status == EFetchStatus::Yield) {
  306. return EFetchStatus::Yield;
  307. }
  308. }
  309. if (bufferNeedsData) {
  310. continue;
  311. }
  312. if (!Buffer.HasData()) {
  313. Y_DEBUG_ABORT_UNLESS(Buffer.IsFinished());
  314. str = TString(TStringBuf(CurrentString.Data(), CurrentString.Size()));
  315. CurrentString.Clear();
  316. return str.empty() ? EFetchStatus::Finish : EFetchStatus::Ok;
  317. }
  318. const char* data;
  319. size_t size;
  320. Buffer.GetData(data, size);
  321. size_t read = 0;
  322. while (!HasDelimiterMatch && read < size) {
  323. DelimiterMatcher.Push(data[read]);
  324. ++read;
  325. }
  326. Y_DEBUG_ABORT_UNLESS(read > 0);
  327. CurrentString.Append(data, read);
  328. bool signalCanWrite = !Buffer.CanWrite();
  329. Buffer.CommitRead(read);
  330. if (signalCanWrite) {
  331. SyncData.OutputBufferCanWriteCond.Signal();
  332. }
  333. }
  334. }
  335. Y_DEBUG_ABORT_UNLESS(CurrentString.Size() >= Delimiter.size());
  336. str = TString(TStringBuf(CurrentString.Data(), CurrentString.Size() - Delimiter.size()));
  337. CurrentString.Clear();
  338. HasDelimiterMatch = false;
  339. return EFetchStatus::Ok;
  340. }
  341. TCyclicRWBuffer& GetBuffer() {
  342. return Buffer;
  343. }
  344. private:
  345. // Write data from child process output to buffer.
  346. // Called from Communicate thread.
  347. void DoWrite(const void* buf, size_t len) override {
  348. const char* curStrPos = reinterpret_cast<const char*>(buf);
  349. size_t curStrLen = len;
  350. while (curStrLen > 0) {
  351. with_lock (SyncData.BuffersMutex) {
  352. while (!Buffer.CanWrite() && !Buffer.IsFinished()) {
  353. SyncData.OutputBufferCanWriteCond.WaitI(SyncData.BuffersMutex);
  354. }
  355. if (Buffer.IsFinished()) {
  356. return;
  357. }
  358. bool signalCanRead = !Buffer.HasData();
  359. Buffer.Write(curStrPos, curStrLen);
  360. if (signalCanRead) {
  361. SyncData.MainThreadHasWorkCond.Signal();
  362. }
  363. }
  364. }
  365. }
  366. void DoFinish() override {
  367. IOutputStream::DoFinish();
  368. with_lock (SyncData.BuffersMutex) {
  369. Buffer.Finish();
  370. SyncData.MainThreadHasWorkCond.Signal();
  371. }
  372. }
  373. private:
  374. class MatcherCallback: public TKMPStreamMatcher<char>::ICallback {
  375. public:
  376. MatcherCallback(bool& hasMatch)
  377. : HasMatch(hasMatch)
  378. {
  379. }
  380. void OnMatch(const char* begin, const char* end) override {
  381. Y_UNUSED(begin);
  382. Y_UNUSED(end);
  383. HasMatch = true;
  384. }
  385. private:
  386. bool& HasMatch;
  387. };
  388. private:
  389. TString Delimiter;
  390. TStringListBufferedInputStream& InputStream;
  391. TThreadSyncData& SyncData;
  392. bool HasDelimiterMatch;
  393. MatcherCallback DelimiterMatcherCallback;
  394. TKMPStreamMatcher<char> DelimiterMatcher;
  395. TBuffer CurrentString;
  396. TCyclicRWBuffer Buffer;
  397. };
  398. class TStreamingOutputListIterator {
  399. public:
  400. TStreamingOutputListIterator(const TStreamingParams& params, const IValueBuilder* valueBuilder, TSourcePosition pos)
  401. : StreamingParams(params)
  402. , ValueBuilder(valueBuilder)
  403. , Pos_(pos)
  404. {
  405. }
  406. TStreamingOutputListIterator(const TStreamingOutputListIterator&) = delete;
  407. TStreamingOutputListIterator& operator=(const TStreamingOutputListIterator&) = delete;
  408. ~TStreamingOutputListIterator() {
  409. if (ShellCommand) {
  410. Y_DEBUG_ABORT_UNLESS(InputStream && OutputStream);
  411. try {
  412. ShellCommand->Terminate();
  413. } catch (const std::exception& e) {
  414. Cerr << CurrentExceptionMessage();
  415. }
  416. // Let Communicate thread finish.
  417. with_lock (ThreadSyncData.BuffersMutex) {
  418. InputStream->GetBuffer().Finish();
  419. OutputStream->GetBuffer().Finish();
  420. ThreadSyncData.InputBufferCanReadCond.Signal();
  421. ThreadSyncData.OutputBufferCanWriteCond.Signal();
  422. }
  423. ShellCommand->Wait();
  424. }
  425. }
  426. EFetchStatus Fetch(TUnboxedValue& result) {
  427. try {
  428. EFetchStatus status = EFetchStatus::Ok;
  429. if (!ProcessStarted()) {
  430. StartProcess();
  431. // Don't try to fetch data if there was a problem starting the process,
  432. // this causes infinite wait on Windows system due to incorrect ShellCommand behavior.
  433. if (ShellCommand->GetStatus() != TShellCommand::SHELL_RUNNING && ShellCommand->GetStatus() != TShellCommand::SHELL_FINISHED) {
  434. status = EFetchStatus::Finish;
  435. }
  436. }
  437. if (status == EFetchStatus::Ok) {
  438. status = OutputStream->FetchNextString(CurrentRecord);
  439. }
  440. if (status == EFetchStatus::Finish) {
  441. switch (ShellCommand->GetStatus()) {
  442. case TShellCommand::SHELL_FINISHED:
  443. break;
  444. case TShellCommand::SHELL_INTERNAL_ERROR:
  445. ythrow yexception() << "Internal error running process: " << ShellCommand->GetInternalError();
  446. break;
  447. case TShellCommand::SHELL_ERROR:
  448. ythrow yexception() << "Error running user process: " << ShellCommand->GetError();
  449. break;
  450. default:
  451. ythrow yexception() << "Unexpected shell command status: " << (int)ShellCommand->GetStatus();
  452. }
  453. return EFetchStatus::Finish;
  454. }
  455. if (status == EFetchStatus::Ok) {
  456. TUnboxedValue* items = nullptr;
  457. result = ValueBuilder->NewArray(1, items);
  458. *items = ValueBuilder->NewString(TStringRef(CurrentRecord.data(), CurrentRecord.size()));
  459. }
  460. return status;
  461. } catch (const std::exception& e) {
  462. UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data());
  463. }
  464. }
  465. private:
  466. void StartProcess() {
  467. InputStream.Reset(new TStringListBufferedInputStream(
  468. StreamingParams.InputStreamObj, StreamingParams.InputDelimiter,
  469. StreamingParams.InputBufferSizeBytes, ThreadSyncData, Pos_));
  470. OutputStream.Reset(new TStringListBufferedOutputStream(
  471. StreamingParams.OutputDelimiter, StreamingParams.OutputBufferSizeBytes, *InputStream,
  472. ThreadSyncData));
  473. TShellCommandOptions opt;
  474. opt.SetAsync(true).SetUseShell(false).SetLatency(StreamingParams.ProcessPollLatencyMs).SetInputStream(InputStream.Get()).SetOutputStream(OutputStream.Get()).SetCloseStreams(true).SetCloseAllFdsOnExec(true);
  475. TList<TString> commandArguments;
  476. auto argumetsIterator = StreamingParams.ArgumentsList.GetListIterator();
  477. for (TUnboxedValue item; argumetsIterator.Next(item);) {
  478. commandArguments.emplace_back(TStringBuf(item.AsStringRef()));
  479. }
  480. ShellCommand.Reset(new TShellCommand(StreamingParams.CommandLine, commandArguments, opt));
  481. ShellCommand->Run();
  482. }
  483. bool ProcessStarted() const {
  484. return !!ShellCommand;
  485. }
  486. private:
  487. TStreamingParams StreamingParams;
  488. const IValueBuilder* ValueBuilder;
  489. TSourcePosition Pos_;
  490. TThreadSyncData ThreadSyncData;
  491. THolder<TShellCommand> ShellCommand;
  492. THolder<TStringListBufferedInputStream> InputStream;
  493. THolder<TStringListBufferedOutputStream> OutputStream;
  494. TString CurrentRecord;
  495. };
  496. class TStreamingOutput: public TBoxedValue {
  497. public:
  498. TStreamingOutput(const TStreamingParams& params, const IValueBuilder* valueBuilder, TSourcePosition pos)
  499. : StreamingParams(params)
  500. , ValueBuilder(valueBuilder)
  501. , Pos_(pos)
  502. {
  503. }
  504. TStreamingOutput(const TStreamingOutput&) = delete;
  505. TStreamingOutput& operator=(const TStreamingOutput&) = delete;
  506. private:
  507. EFetchStatus Fetch(TUnboxedValue& result) override {
  508. if (IsFinished) {
  509. return EFetchStatus::Finish;
  510. }
  511. if (!Iterator) {
  512. Iterator.Reset(new TStreamingOutputListIterator(StreamingParams, ValueBuilder, Pos_));
  513. }
  514. auto ret = Iterator->Fetch(result);
  515. if (ret == EFetchStatus::Finish) {
  516. IsFinished = true;
  517. Iterator.Reset();
  518. }
  519. return ret;
  520. }
  521. TStreamingParams StreamingParams;
  522. const IValueBuilder* ValueBuilder;
  523. TSourcePosition Pos_;
  524. bool IsFinished = false;
  525. THolder<TStreamingOutputListIterator> Iterator;
  526. };
  527. class TStreamingScriptOutput: public TStreamingOutput {
  528. public:
  529. TStreamingScriptOutput(const TStreamingParams& params, const IValueBuilder* valueBuilder,
  530. TSourcePosition pos, const TString& script, const TString& scriptFilename)
  531. : TStreamingOutput(params, valueBuilder, pos)
  532. , ScriptFileHandle(scriptFilename)
  533. {
  534. auto scriptStripped = StripBeforeShebang(script);
  535. ScriptFileHandle.Write(scriptStripped.data(), scriptStripped.size());
  536. ScriptFileHandle.Close();
  537. if (Chmod(ScriptFileHandle.Name().c_str(), MODE0755) != 0) {
  538. ythrow yexception() << "Chmod failed for script file:" << ScriptFileHandle.Name()
  539. << " with error: " << LastSystemErrorText();
  540. }
  541. }
  542. private:
  543. static TString StripBeforeShebang(const TString& script) {
  544. auto shebangIndex = script.find("#!");
  545. if (shebangIndex != TString::npos) {
  546. auto scriptStripped = StripStringLeft(script);
  547. if (scriptStripped.size() == script.size() - shebangIndex) {
  548. return scriptStripped;
  549. }
  550. }
  551. return script;
  552. }
  553. TTempFileHandle ScriptFileHandle;
  554. };
  555. class TStreamingProcess: public TBoxedValue {
  556. public:
  557. TStreamingProcess(TSourcePosition pos)
  558. : Pos_(pos)
  559. {}
  560. private:
  561. TUnboxedValue Run(const IValueBuilder* valueBuilder,
  562. const TUnboxedValuePod* args) const override {
  563. auto inputListArg = args[0];
  564. auto commandLineArg = args[1].AsStringRef();
  565. auto argumentsArg = args[2];
  566. auto inputDelimiterArg = args[3];
  567. auto outputDelimiterArg = args[4];
  568. Y_DEBUG_ABORT_UNLESS(inputListArg.IsBoxed());
  569. TStreamingParams params;
  570. params.InputStreamObj = TUnboxedValuePod(inputListArg);
  571. params.CommandLine = TString(TStringBuf(commandLineArg));
  572. params.ArgumentsList = !argumentsArg
  573. ? valueBuilder->NewEmptyList()
  574. : TUnboxedValue(argumentsArg.GetOptionalValue());
  575. if (inputDelimiterArg) {
  576. params.InputDelimiter = TString(TStringBuf(inputDelimiterArg.AsStringRef()));
  577. }
  578. if (outputDelimiterArg) {
  579. params.OutputDelimiter = TString(TStringBuf(outputDelimiterArg.AsStringRef()));
  580. }
  581. return TUnboxedValuePod(new TStreamingOutput(params, valueBuilder, Pos_));
  582. }
  583. public:
  584. static TStringRef Name() {
  585. static auto name = TStringRef::Of("Process");
  586. return name;
  587. }
  588. private:
  589. TSourcePosition Pos_;
  590. };
  591. class TStreamingProcessInline: public TBoxedValue {
  592. public:
  593. TStreamingProcessInline(TSourcePosition pos)
  594. : Pos_(pos)
  595. {}
  596. private:
  597. TUnboxedValue Run(const IValueBuilder* valueBuilder,
  598. const TUnboxedValuePod* args) const override {
  599. auto inputListArg = args[0];
  600. auto scriptArg = args[1].AsStringRef();
  601. auto argumentsArg = args[2];
  602. auto inputDelimiterArg = args[3];
  603. auto outputDelimiterArg = args[4];
  604. TString script(scriptArg);
  605. TString scriptFilename = MakeTempName(".");
  606. TStreamingParams params;
  607. params.InputStreamObj = TUnboxedValuePod(inputListArg);
  608. params.CommandLine = scriptFilename;
  609. params.ArgumentsList = !argumentsArg
  610. ? valueBuilder->NewEmptyList()
  611. : TUnboxedValue(argumentsArg.GetOptionalValue());
  612. if (inputDelimiterArg) {
  613. params.InputDelimiter = TString(TStringBuf(inputDelimiterArg.AsStringRef()));
  614. }
  615. if (outputDelimiterArg) {
  616. params.OutputDelimiter = TString(TStringBuf(outputDelimiterArg.AsStringRef()));
  617. }
  618. return TUnboxedValuePod(new TStreamingScriptOutput(params, valueBuilder, Pos_, script, scriptFilename));
  619. }
  620. public:
  621. static TStringRef Name() {
  622. static auto name = TStringRef::Of("ProcessInline");
  623. return name;
  624. }
  625. private:
  626. TSourcePosition Pos_;
  627. };
  628. class TStreamingModule: public IUdfModule {
  629. public:
  630. TStringRef Name() const {
  631. return TStringRef::Of("Streaming");
  632. }
  633. void CleanupOnTerminate() const final {
  634. }
  635. void GetAllFunctions(IFunctionsSink& sink) const final {
  636. sink.Add(TStreamingProcess::Name());
  637. sink.Add(TStreamingProcessInline::Name());
  638. }
  639. void BuildFunctionTypeInfo(
  640. const TStringRef& name,
  641. NUdf::TType* userType,
  642. const TStringRef& typeConfig,
  643. ui32 flags,
  644. IFunctionTypeInfoBuilder& builder) const override {
  645. try {
  646. Y_UNUSED(userType);
  647. Y_UNUSED(typeConfig);
  648. bool typesOnly = (flags & TFlags::TypesOnly);
  649. auto optionalStringType = builder.Optional()->Item<char*>().Build();
  650. auto rowType = builder.Struct(1)->AddField("Data", TDataType<char*>::Id, nullptr).Build();
  651. auto rowsType = builder.Stream()->Item(rowType).Build();
  652. auto stringListType = builder.List()->Item(TDataType<char*>::Id).Build();
  653. auto optionalStringListType = builder.Optional()->Item(stringListType).Build();
  654. if (TStreamingProcess::Name() == name) {
  655. builder.Returns(rowsType).Args()->Add(rowsType).Add<char*>().Add(optionalStringListType).Add(optionalStringType).Add(optionalStringType).Done().OptionalArgs(3);
  656. if (!typesOnly) {
  657. builder.Implementation(new TStreamingProcess(builder.GetSourcePosition()));
  658. }
  659. }
  660. if (TStreamingProcessInline::Name() == name) {
  661. builder.Returns(rowsType).Args()->Add(rowsType).Add<char*>().Add(optionalStringListType).Add(optionalStringType).Add(optionalStringType).Done().OptionalArgs(3);
  662. if (!typesOnly) {
  663. builder.Implementation(new TStreamingProcessInline(builder.GetSourcePosition()));
  664. }
  665. }
  666. } catch (const std::exception& e) {
  667. builder.SetError(CurrentExceptionMessage());
  668. }
  669. }
  670. };
  671. }
  672. REGISTER_MODULES(TStreamingModule)