123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829 |
- #include <yql/essentials/public/udf/udf_value.h>
- #include <yql/essentials/public/udf/udf_registrator.h>
- #include <yql/essentials/public/udf/udf_type_builder.h>
- #include <yql/essentials/public/udf/udf_value_builder.h>
- #include <yql/essentials/public/udf/udf_terminator.h>
- #include <util/generic/buffer.h>
- #include <util/generic/mem_copy.h>
- #include <util/generic/maybe.h>
- #include <util/generic/ptr.h>
- #include <util/string/builder.h>
- #include <util/stream/mem.h>
- #include <library/cpp/deprecated/kmp/kmp.h>
- #include <util/string/strip.h>
- #include <util/system/condvar.h>
- #include <util/system/shellcommand.h>
- #include <util/system/tempfile.h>
- #include <util/system/sysstat.h>
- #include <functional>
- using namespace NKikimr;
- using namespace NUdf;
- namespace {
- // Cyclic Read-Write buffer.
- // Not thread safe, synchronization between reader and writer threads
- // should be managed externally.
- class TCyclicRWBuffer {
- public:
- TCyclicRWBuffer(size_t capacity)
- : Buffer(capacity)
- , Finished(false)
- , DataStart(0)
- , DataSize(0)
- {
- Buffer.Resize(capacity);
- }
- bool IsFinished() const {
- return Finished;
- }
- void Finish() {
- Finished = true;
- }
- bool HasData() const {
- return DataSize > 0;
- }
- size_t GetDataSize() const {
- return DataSize;
- }
- void GetData(const char*& ptr, size_t& len) const {
- size_t readSize = GetDataRegionSize(DataStart, DataSize);
- ptr = Buffer.Data() + DataStart;
- len = readSize;
- }
- void CommitRead(size_t len) {
- Y_DEBUG_ABORT_UNLESS(len <= GetDataRegionSize(DataStart, DataSize));
- DataStart = GetBufferPosition(DataStart + len);
- DataSize -= len;
- }
- bool CanWrite() const {
- return WriteSize() > 0;
- }
- size_t WriteSize() const {
- return Buffer.Size() - DataSize;
- }
- size_t Write(const char*& ptr, size_t& len) {
- if (!CanWrite()) {
- return 0;
- }
- size_t bytesWritten = 0;
- size_t bytesToWrite = std::min(len, WriteSize());
- while (bytesToWrite > 0) {
- size_t writeStart = GetWriteStart();
- size_t writeSize = GetDataRegionSize(writeStart, bytesToWrite);
- MemCopy(Data(writeStart), ptr, writeSize);
- DataSize += writeSize;
- bytesWritten += writeSize;
- bytesToWrite -= writeSize;
- ptr += writeSize;
- len -= writeSize;
- }
- return bytesWritten;
- }
- size_t Write(IZeroCopyInput& input) {
- const void* ptr;
- size_t dataLen = input.Next(&ptr, WriteSize());
- const char* dataPtr = reinterpret_cast<const char*>(ptr);
- return Write(dataPtr, dataLen);
- }
- private:
- size_t GetBufferPosition(size_t pos) const {
- return pos % Buffer.Size();
- }
- size_t GetDataRegionSize(size_t start, size_t size) const {
- Y_DEBUG_ABORT_UNLESS(start < Buffer.Size());
- return std::min(size, Buffer.Size() - start);
- }
- size_t GetWriteStart() const {
- return GetBufferPosition(DataStart + DataSize);
- }
- char* Data(size_t pos) {
- Y_DEBUG_ABORT_UNLESS(pos < Buffer.Size());
- return (Buffer.Data() + pos);
- }
- private:
- TBuffer Buffer;
- bool Finished;
- size_t DataStart;
- size_t DataSize;
- };
- struct TStreamingParams {
- public:
- const size_t DefaultProcessPollLatencyMs = 5 * 1000; // 5 seconds
- const size_t DefaultInputBufferSizeBytes = 4 * 1024 * 1024; // 4MB
- const size_t DefaultOutputBufferSizeBytes = 16 * 1024 * 1024; // 16MB
- const char* DefaultInputDelimiter = "\n";
- const char* DefaultOutputDelimiter = "\n";
- public:
- TUnboxedValue InputStreamObj;
- TString CommandLine;
- TUnboxedValue ArgumentsList;
- TString InputDelimiter;
- TString OutputDelimiter;
- size_t InputBufferSizeBytes;
- size_t OutputBufferSizeBytes;
- size_t ProcessPollLatencyMs;
- TStreamingParams()
- : InputDelimiter(DefaultInputDelimiter)
- , OutputDelimiter(DefaultOutputDelimiter)
- , InputBufferSizeBytes(DefaultInputBufferSizeBytes)
- , OutputBufferSizeBytes(DefaultOutputBufferSizeBytes)
- , ProcessPollLatencyMs(DefaultProcessPollLatencyMs)
- {
- }
- };
- struct TThreadSyncData {
- TMutex BuffersMutex;
- TCondVar InputBufferCanReadCond;
- TCondVar MainThreadHasWorkCond;
- TCondVar OutputBufferCanWriteCond;
- };
- class TStringListBufferedInputStream: public IInputStream {
- public:
- TStringListBufferedInputStream(TUnboxedValue rowsStream, const TString& delimiter, size_t bufferSizeBytes,
- TThreadSyncData& syncData, TSourcePosition pos)
- : RowsStream(rowsStream)
- , Delimiter(delimiter)
- , SyncData(syncData)
- , Pos_(pos)
- , DelimiterMatcher(delimiter)
- , DelimiterInput(delimiter)
- , Buffer(bufferSizeBytes)
- , CurReadMode(ReadMode::Start)
- {
- }
- TStringListBufferedInputStream(const TStringListBufferedInputStream&) = delete;
- TStringListBufferedInputStream& operator=(const TStringListBufferedInputStream&) = delete;
- TCyclicRWBuffer& GetBuffer() {
- return Buffer;
- }
- // Fetch input from upstream list iterator to the buffer.
- // Called from Main thread.
- EFetchStatus FetchInput() {
- with_lock (SyncData.BuffersMutex) {
- Y_DEBUG_ABORT_UNLESS(!Buffer.HasData());
- Y_DEBUG_ABORT_UNLESS(Buffer.CanWrite());
- bool receivedYield = false;
- while (Buffer.CanWrite() && CurReadMode != ReadMode::Done && !receivedYield) {
- switch (CurReadMode) {
- case ReadMode::Start: {
- auto status = ReadNextString();
- if (status == EFetchStatus::Yield) {
- receivedYield = true;
- break;
- }
- CurReadMode = (status == EFetchStatus::Ok)
- ? ReadMode::String
- : ReadMode::Done;
- break;
- }
- case ReadMode::String:
- if (CurStringInput.Exhausted()) {
- DelimiterInput.Reset(Delimiter.data(), Delimiter.size());
- CurReadMode = ReadMode::Delimiter;
- break;
- }
- Buffer.Write(CurStringInput);
- break;
- case ReadMode::Delimiter:
- if (DelimiterInput.Exhausted()) {
- CurReadMode = ReadMode::Start;
- break;
- }
- Buffer.Write(DelimiterInput);
- break;
- default:
- break;
- }
- }
- if (CurReadMode == ReadMode::Done) {
- Buffer.Finish();
- }
- SyncData.InputBufferCanReadCond.Signal();
- return receivedYield ? EFetchStatus::Yield : EFetchStatus::Ok;
- }
- }
- private:
- // Read data to pass into the child process input pipe.
- // Called from Communicate thread.
- size_t DoRead(void* buf, size_t len) override {
- try {
- with_lock (SyncData.BuffersMutex) {
- while (!Buffer.HasData() && !Buffer.IsFinished()) {
- SyncData.MainThreadHasWorkCond.Signal();
- SyncData.InputBufferCanReadCond.WaitI(SyncData.BuffersMutex);
- }
- if (!Buffer.HasData()) {
- Y_DEBUG_ABORT_UNLESS(Buffer.IsFinished());
- return 0;
- }
- const char* dataPtr;
- size_t dataLen;
- Buffer.GetData(dataPtr, dataLen);
- size_t bytesRead = std::min(dataLen, len);
- Y_DEBUG_ABORT_UNLESS(bytesRead > 0);
- memcpy(buf, dataPtr, bytesRead);
- Buffer.CommitRead(bytesRead);
- return bytesRead;
- }
- ythrow yexception();
- } catch (const std::exception& e) {
- UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data());
- }
- }
- EFetchStatus ReadNextString() {
- TUnboxedValue item;
- EFetchStatus status = RowsStream.Fetch(item);
- switch (status) {
- case EFetchStatus::Yield:
- case EFetchStatus::Finish:
- return status;
- default:
- break;
- }
- CurString = item.GetElement(0);
- CurStringInput.Reset(CurString.AsStringRef().Data(), CurString.AsStringRef().Size());
- // Check that input string doesn't contain delimiters
- const char* match;
- Y_UNUSED(match);
- if (DelimiterMatcher.SubStr(
- CurString.AsStringRef().Data(),
- CurString.AsStringRef().Data() + CurString.AsStringRef().Size(),
- match))
- {
- ythrow yexception() << "Delimiter found in input string.";
- }
- return EFetchStatus::Ok;
- }
- private:
- enum class ReadMode {
- Start,
- String,
- Delimiter,
- Done
- };
- TUnboxedValue RowsStream;
- TString Delimiter;
- TThreadSyncData& SyncData;
- TSourcePosition Pos_;
- TKMPMatcher DelimiterMatcher;
- TUnboxedValue CurString;
- TMemoryInput CurStringInput;
- TMemoryInput DelimiterInput;
- TCyclicRWBuffer Buffer;
- ReadMode CurReadMode;
- };
- class TStringListBufferedOutputStream: public IOutputStream {
- public:
- TStringListBufferedOutputStream(const TString& delimiter, size_t stringBufferSizeBytes,
- TStringListBufferedInputStream& inputStream, TThreadSyncData& syncData)
- : Delimiter(delimiter)
- , InputStream(inputStream)
- , SyncData(syncData)
- , HasDelimiterMatch(false)
- , DelimiterMatcherCallback(HasDelimiterMatch)
- , DelimiterMatcher(delimiter.data(), delimiter.data() + delimiter.size(), &DelimiterMatcherCallback)
- , Buffer(stringBufferSizeBytes)
- {
- }
- TStringListBufferedOutputStream(const TStringListBufferedOutputStream&) = delete;
- TStringListBufferedOutputStream& operator=(const TStringListBufferedOutputStream&) = delete;
- // Get string record from buffer.
- // Called from Main thread.
- EFetchStatus FetchNextString(TString& str) {
- while (!HasDelimiterMatch) {
- with_lock (SyncData.BuffersMutex) {
- bool inputHasData;
- bool bufferNeedsData;
- do {
- inputHasData = InputStream.GetBuffer().HasData() || InputStream.GetBuffer().IsFinished();
- bufferNeedsData = !Buffer.HasData() && !Buffer.IsFinished();
- if (inputHasData && bufferNeedsData) {
- SyncData.MainThreadHasWorkCond.WaitI(SyncData.BuffersMutex);
- }
- } while (inputHasData && bufferNeedsData);
- if (!inputHasData) {
- auto status = InputStream.FetchInput();
- if (status == EFetchStatus::Yield) {
- return EFetchStatus::Yield;
- }
- }
- if (bufferNeedsData) {
- continue;
- }
- if (!Buffer.HasData()) {
- Y_DEBUG_ABORT_UNLESS(Buffer.IsFinished());
- str = TString(TStringBuf(CurrentString.Data(), CurrentString.Size()));
- CurrentString.Clear();
- return str.empty() ? EFetchStatus::Finish : EFetchStatus::Ok;
- }
- const char* data;
- size_t size;
- Buffer.GetData(data, size);
- size_t read = 0;
- while (!HasDelimiterMatch && read < size) {
- DelimiterMatcher.Push(data[read]);
- ++read;
- }
- Y_DEBUG_ABORT_UNLESS(read > 0);
- CurrentString.Append(data, read);
- bool signalCanWrite = !Buffer.CanWrite();
- Buffer.CommitRead(read);
- if (signalCanWrite) {
- SyncData.OutputBufferCanWriteCond.Signal();
- }
- }
- }
- Y_DEBUG_ABORT_UNLESS(CurrentString.Size() >= Delimiter.size());
- str = TString(TStringBuf(CurrentString.Data(), CurrentString.Size() - Delimiter.size()));
- CurrentString.Clear();
- HasDelimiterMatch = false;
- return EFetchStatus::Ok;
- }
- TCyclicRWBuffer& GetBuffer() {
- return Buffer;
- }
- private:
- // Write data from child process output to buffer.
- // Called from Communicate thread.
- void DoWrite(const void* buf, size_t len) override {
- const char* curStrPos = reinterpret_cast<const char*>(buf);
- size_t curStrLen = len;
- while (curStrLen > 0) {
- with_lock (SyncData.BuffersMutex) {
- while (!Buffer.CanWrite() && !Buffer.IsFinished()) {
- SyncData.OutputBufferCanWriteCond.WaitI(SyncData.BuffersMutex);
- }
- if (Buffer.IsFinished()) {
- return;
- }
- bool signalCanRead = !Buffer.HasData();
- Buffer.Write(curStrPos, curStrLen);
- if (signalCanRead) {
- SyncData.MainThreadHasWorkCond.Signal();
- }
- }
- }
- }
- void DoFinish() override {
- IOutputStream::DoFinish();
- with_lock (SyncData.BuffersMutex) {
- Buffer.Finish();
- SyncData.MainThreadHasWorkCond.Signal();
- }
- }
- private:
- class MatcherCallback: public TKMPStreamMatcher<char>::ICallback {
- public:
- MatcherCallback(bool& hasMatch)
- : HasMatch(hasMatch)
- {
- }
- void OnMatch(const char* begin, const char* end) override {
- Y_UNUSED(begin);
- Y_UNUSED(end);
- HasMatch = true;
- }
- private:
- bool& HasMatch;
- };
- private:
- TString Delimiter;
- TStringListBufferedInputStream& InputStream;
- TThreadSyncData& SyncData;
- bool HasDelimiterMatch;
- MatcherCallback DelimiterMatcherCallback;
- TKMPStreamMatcher<char> DelimiterMatcher;
- TBuffer CurrentString;
- TCyclicRWBuffer Buffer;
- };
- class TStreamingOutputListIterator {
- public:
- TStreamingOutputListIterator(const TStreamingParams& params, const IValueBuilder* valueBuilder, TSourcePosition pos)
- : StreamingParams(params)
- , ValueBuilder(valueBuilder)
- , Pos_(pos)
- {
- }
- TStreamingOutputListIterator(const TStreamingOutputListIterator&) = delete;
- TStreamingOutputListIterator& operator=(const TStreamingOutputListIterator&) = delete;
- ~TStreamingOutputListIterator() {
- if (ShellCommand) {
- Y_DEBUG_ABORT_UNLESS(InputStream && OutputStream);
- try {
- ShellCommand->Terminate();
- } catch (const std::exception& e) {
- Cerr << CurrentExceptionMessage();
- }
- // Let Communicate thread finish.
- with_lock (ThreadSyncData.BuffersMutex) {
- InputStream->GetBuffer().Finish();
- OutputStream->GetBuffer().Finish();
- ThreadSyncData.InputBufferCanReadCond.Signal();
- ThreadSyncData.OutputBufferCanWriteCond.Signal();
- }
- ShellCommand->Wait();
- }
- }
- EFetchStatus Fetch(TUnboxedValue& result) {
- try {
- EFetchStatus status = EFetchStatus::Ok;
- if (!ProcessStarted()) {
- StartProcess();
- // Don't try to fetch data if there was a problem starting the process,
- // this causes infinite wait on Windows system due to incorrect ShellCommand behavior.
- if (ShellCommand->GetStatus() != TShellCommand::SHELL_RUNNING && ShellCommand->GetStatus() != TShellCommand::SHELL_FINISHED) {
- status = EFetchStatus::Finish;
- }
- }
- if (status == EFetchStatus::Ok) {
- status = OutputStream->FetchNextString(CurrentRecord);
- }
- if (status == EFetchStatus::Finish) {
- switch (ShellCommand->GetStatus()) {
- case TShellCommand::SHELL_FINISHED:
- break;
- case TShellCommand::SHELL_INTERNAL_ERROR:
- ythrow yexception() << "Internal error running process: " << ShellCommand->GetInternalError();
- break;
- case TShellCommand::SHELL_ERROR:
- ythrow yexception() << "Error running user process: " << ShellCommand->GetError();
- break;
- default:
- ythrow yexception() << "Unexpected shell command status: " << (int)ShellCommand->GetStatus();
- }
- return EFetchStatus::Finish;
- }
- if (status == EFetchStatus::Ok) {
- TUnboxedValue* items = nullptr;
- result = ValueBuilder->NewArray(1, items);
- *items = ValueBuilder->NewString(TStringRef(CurrentRecord.data(), CurrentRecord.size()));
- }
- return status;
- } catch (const std::exception& e) {
- UdfTerminate((TStringBuilder() << Pos_ << " " << e.what()).data());
- }
- }
- private:
- void StartProcess() {
- InputStream.Reset(new TStringListBufferedInputStream(
- StreamingParams.InputStreamObj, StreamingParams.InputDelimiter,
- StreamingParams.InputBufferSizeBytes, ThreadSyncData, Pos_));
- OutputStream.Reset(new TStringListBufferedOutputStream(
- StreamingParams.OutputDelimiter, StreamingParams.OutputBufferSizeBytes, *InputStream,
- ThreadSyncData));
- TShellCommandOptions opt;
- opt.SetAsync(true).SetUseShell(false).SetLatency(StreamingParams.ProcessPollLatencyMs).SetInputStream(InputStream.Get()).SetOutputStream(OutputStream.Get()).SetCloseStreams(true).SetCloseAllFdsOnExec(true);
- TList<TString> commandArguments;
- auto argumetsIterator = StreamingParams.ArgumentsList.GetListIterator();
- for (TUnboxedValue item; argumetsIterator.Next(item);) {
- commandArguments.emplace_back(TStringBuf(item.AsStringRef()));
- }
- ShellCommand.Reset(new TShellCommand(StreamingParams.CommandLine, commandArguments, opt));
- ShellCommand->Run();
- }
- bool ProcessStarted() const {
- return !!ShellCommand;
- }
- private:
- TStreamingParams StreamingParams;
- const IValueBuilder* ValueBuilder;
- TSourcePosition Pos_;
- TThreadSyncData ThreadSyncData;
- THolder<TShellCommand> ShellCommand;
- THolder<TStringListBufferedInputStream> InputStream;
- THolder<TStringListBufferedOutputStream> OutputStream;
- TString CurrentRecord;
- };
- class TStreamingOutput: public TBoxedValue {
- public:
- TStreamingOutput(const TStreamingParams& params, const IValueBuilder* valueBuilder, TSourcePosition pos)
- : StreamingParams(params)
- , ValueBuilder(valueBuilder)
- , Pos_(pos)
- {
- }
- TStreamingOutput(const TStreamingOutput&) = delete;
- TStreamingOutput& operator=(const TStreamingOutput&) = delete;
- private:
- EFetchStatus Fetch(TUnboxedValue& result) override {
- if (IsFinished) {
- return EFetchStatus::Finish;
- }
- if (!Iterator) {
- Iterator.Reset(new TStreamingOutputListIterator(StreamingParams, ValueBuilder, Pos_));
- }
- auto ret = Iterator->Fetch(result);
- if (ret == EFetchStatus::Finish) {
- IsFinished = true;
- Iterator.Reset();
- }
- return ret;
- }
- TStreamingParams StreamingParams;
- const IValueBuilder* ValueBuilder;
- TSourcePosition Pos_;
- bool IsFinished = false;
- THolder<TStreamingOutputListIterator> Iterator;
- };
- class TStreamingScriptOutput: public TStreamingOutput {
- public:
- TStreamingScriptOutput(const TStreamingParams& params, const IValueBuilder* valueBuilder,
- TSourcePosition pos, const TString& script, const TString& scriptFilename)
- : TStreamingOutput(params, valueBuilder, pos)
- , ScriptFileHandle(scriptFilename)
- {
- auto scriptStripped = StripBeforeShebang(script);
- ScriptFileHandle.Write(scriptStripped.data(), scriptStripped.size());
- ScriptFileHandle.Close();
- if (Chmod(ScriptFileHandle.Name().c_str(), MODE0755) != 0) {
- ythrow yexception() << "Chmod failed for script file:" << ScriptFileHandle.Name()
- << " with error: " << LastSystemErrorText();
- }
- }
- private:
- static TString StripBeforeShebang(const TString& script) {
- auto shebangIndex = script.find("#!");
- if (shebangIndex != TString::npos) {
- auto scriptStripped = StripStringLeft(script);
- if (scriptStripped.size() == script.size() - shebangIndex) {
- return scriptStripped;
- }
- }
- return script;
- }
- TTempFileHandle ScriptFileHandle;
- };
- class TStreamingProcess: public TBoxedValue {
- public:
- TStreamingProcess(TSourcePosition pos)
- : Pos_(pos)
- {}
- private:
- TUnboxedValue Run(const IValueBuilder* valueBuilder,
- const TUnboxedValuePod* args) const override {
- auto inputListArg = args[0];
- auto commandLineArg = args[1].AsStringRef();
- auto argumentsArg = args[2];
- auto inputDelimiterArg = args[3];
- auto outputDelimiterArg = args[4];
- Y_DEBUG_ABORT_UNLESS(inputListArg.IsBoxed());
- TStreamingParams params;
- params.InputStreamObj = TUnboxedValuePod(inputListArg);
- params.CommandLine = TString(TStringBuf(commandLineArg));
- params.ArgumentsList = !argumentsArg
- ? valueBuilder->NewEmptyList()
- : TUnboxedValue(argumentsArg.GetOptionalValue());
- if (inputDelimiterArg) {
- params.InputDelimiter = TString(TStringBuf(inputDelimiterArg.AsStringRef()));
- }
- if (outputDelimiterArg) {
- params.OutputDelimiter = TString(TStringBuf(outputDelimiterArg.AsStringRef()));
- }
- return TUnboxedValuePod(new TStreamingOutput(params, valueBuilder, Pos_));
- }
- public:
- static TStringRef Name() {
- static auto name = TStringRef::Of("Process");
- return name;
- }
- private:
- TSourcePosition Pos_;
- };
- class TStreamingProcessInline: public TBoxedValue {
- public:
- TStreamingProcessInline(TSourcePosition pos)
- : Pos_(pos)
- {}
- private:
- TUnboxedValue Run(const IValueBuilder* valueBuilder,
- const TUnboxedValuePod* args) const override {
- auto inputListArg = args[0];
- auto scriptArg = args[1].AsStringRef();
- auto argumentsArg = args[2];
- auto inputDelimiterArg = args[3];
- auto outputDelimiterArg = args[4];
- TString script(scriptArg);
- TString scriptFilename = MakeTempName(".");
- TStreamingParams params;
- params.InputStreamObj = TUnboxedValuePod(inputListArg);
- params.CommandLine = scriptFilename;
- params.ArgumentsList = !argumentsArg
- ? valueBuilder->NewEmptyList()
- : TUnboxedValue(argumentsArg.GetOptionalValue());
- if (inputDelimiterArg) {
- params.InputDelimiter = TString(TStringBuf(inputDelimiterArg.AsStringRef()));
- }
- if (outputDelimiterArg) {
- params.OutputDelimiter = TString(TStringBuf(outputDelimiterArg.AsStringRef()));
- }
- return TUnboxedValuePod(new TStreamingScriptOutput(params, valueBuilder, Pos_, script, scriptFilename));
- }
- public:
- static TStringRef Name() {
- static auto name = TStringRef::Of("ProcessInline");
- return name;
- }
- private:
- TSourcePosition Pos_;
- };
- class TStreamingModule: public IUdfModule {
- public:
- TStringRef Name() const {
- return TStringRef::Of("Streaming");
- }
- void CleanupOnTerminate() const final {
- }
- void GetAllFunctions(IFunctionsSink& sink) const final {
- sink.Add(TStreamingProcess::Name());
- sink.Add(TStreamingProcessInline::Name());
- }
- void BuildFunctionTypeInfo(
- const TStringRef& name,
- NUdf::TType* userType,
- const TStringRef& typeConfig,
- ui32 flags,
- IFunctionTypeInfoBuilder& builder) const override {
- try {
- Y_UNUSED(userType);
- Y_UNUSED(typeConfig);
- bool typesOnly = (flags & TFlags::TypesOnly);
- auto optionalStringType = builder.Optional()->Item<char*>().Build();
- auto rowType = builder.Struct(1)->AddField("Data", TDataType<char*>::Id, nullptr).Build();
- auto rowsType = builder.Stream()->Item(rowType).Build();
- auto stringListType = builder.List()->Item(TDataType<char*>::Id).Build();
- auto optionalStringListType = builder.Optional()->Item(stringListType).Build();
- if (TStreamingProcess::Name() == name) {
- builder.Returns(rowsType).Args()->Add(rowsType).Add<char*>().Add(optionalStringListType).Add(optionalStringType).Add(optionalStringType).Done().OptionalArgs(3);
- if (!typesOnly) {
- builder.Implementation(new TStreamingProcess(builder.GetSourcePosition()));
- }
- }
- if (TStreamingProcessInline::Name() == name) {
- builder.Returns(rowsType).Args()->Add(rowsType).Add<char*>().Add(optionalStringListType).Add(optionalStringType).Add(optionalStringType).Done().OptionalArgs(3);
- if (!typesOnly) {
- builder.Implementation(new TStreamingProcessInline(builder.GetSourcePosition()));
- }
- }
- } catch (const std::exception& e) {
- builder.SetError(CurrentExceptionMessage());
- }
- }
- };
- }
- REGISTER_MODULES(TStreamingModule)
|