#include #include #include #include using namespace NYql::NPureCalc; using namespace NExampleProtos; void PullStreamExample(IProgramFactoryPtr); void PushStreamExample(IProgramFactoryPtr); void PrecompileExample(IProgramFactoryPtr factory); THolder> MakeInput(); class TConsumer: public IConsumer { public: void OnObject(TOutput* message) override { Cout << "path = " << message->GetPath() << Endl; Cout << "host = " << message->GetHost() << Endl; } void OnFinish() override { Cout << "end" << Endl; } }; const char* Query = R"( $a = (SELECT * FROM Input); $b = (SELECT CAST(Url::GetTail(Url) AS Utf8) AS Path, CAST(Url::GetHost(Url) AS Utf8) AS Host, Ip FROM $a); $c = (SELECT Path, Host FROM $b WHERE Path IS NOT NULL AND Host IS NOT NULL AND Ip::IsIPv4(Ip::FromString(Ip))); $d = (SELECT Unwrap(Path) AS Path, Unwrap(Host) AS Host FROM $c); SELECT * FROM $d; )"; int main(int argc, char** argv) { try { auto factory = MakeProgramFactory( TProgramFactoryOptions().SetUDFsDir(argc > 1 ? argv[1] : "../../../../udfs")); Cout << "Pull stream:" << Endl; PullStreamExample(factory); Cout << Endl; Cout << "Push stream:" << Endl; PushStreamExample(factory); Cout << Endl; Cout << "Pull stream with pre-compilation:" << Endl; PrecompileExample(factory); } catch (const TCompileError& err) { Cerr << err.GetIssues() << Endl; Cerr << err.what() << Endl; } } void PullStreamExample(IProgramFactoryPtr factory) { auto program = factory->MakePullStreamProgram( TProtobufInputSpec(), TProtobufOutputSpec(), Query, ETranslationMode::SQL); auto result = program->Apply(MakeInput()); while (auto* message = result->Fetch()) { Cout << "path = " << message->GetPath() << Endl; Cout << "host = " << message->GetHost() << Endl; } } void PushStreamExample(IProgramFactoryPtr factory) { auto program = factory->MakePushStreamProgram( TProtobufInputSpec(), TProtobufOutputSpec(), Query, ETranslationMode::SQL); auto consumer = program->Apply(MakeHolder()); auto input = MakeInput(); while (auto* message = input->Fetch()) { consumer->OnObject(message); } consumer->OnFinish(); } void PrecompileExample(IProgramFactoryPtr factory) { TString prg; { auto program = factory->MakePullStreamProgram( TProtobufInputSpec(), TProtobufOutputSpec(), Query, ETranslationMode::SQL); prg = program->GetCompiledProgram(); } auto program = factory->MakePullStreamProgram( TProtobufInputSpec(), TProtobufOutputSpec(), prg, ETranslationMode::Mkql); auto result = program->Apply(MakeInput()); while (auto* message = result->Fetch()) { Cout << "path = " << message->GetPath() << Endl; Cout << "host = " << message->GetHost() << Endl; } } THolder> MakeInput() { TVector input; { auto& message = input.emplace_back(); message.SetUrl("https://news.yandex.ru/Moscow/index.html?from=index"); message.SetIp("83.220.231.160"); } { auto& message = input.emplace_back(); message.SetUrl("https://music.yandex.ru/radio/"); message.SetIp("83.220.231.161"); } { auto& message = input.emplace_back(); message.SetUrl("https://yandex.ru/maps/?ll=141.475401%2C11.581666&spn=1.757813%2C1.733096&z=7&l=map%2Cstv%2Csta&mode=search&panorama%5Bpoint%5D=141.476317%2C11.582710&panorama%5Bdirection%5D=177.241445%2C-15.219821&panorama%5Bspan%5D=107.410156%2C61.993317"); message.SetIp("::ffff:77.75.155.3"); } return StreamFromVector(std::move(input)); }