main.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #include <yql/essentials/public/purecalc/examples/protobuf/main.pb.h>
  2. #include <yql/essentials/public/purecalc/purecalc.h>
  3. #include <yql/essentials/public/purecalc/io_specs/protobuf/spec.h>
  4. #include <yql/essentials/public/purecalc/helpers/stream/stream_from_vector.h>
  5. using namespace NYql::NPureCalc;
  6. using namespace NExampleProtos;
  7. void PullStreamExample(IProgramFactoryPtr);
  8. void PushStreamExample(IProgramFactoryPtr);
  9. void PrecompileExample(IProgramFactoryPtr factory);
  10. THolder<IStream<TInput*>> MakeInput();
  11. class TConsumer: public IConsumer<TOutput*> {
  12. public:
  13. void OnObject(TOutput* message) override {
  14. Cout << "path = " << message->GetPath() << Endl;
  15. Cout << "host = " << message->GetHost() << Endl;
  16. }
  17. void OnFinish() override {
  18. Cout << "end" << Endl;
  19. }
  20. };
  21. const char* Query = R"(
  22. $a = (SELECT * FROM Input);
  23. $b = (SELECT CAST(Url::GetTail(Url) AS Utf8) AS Path, CAST(Url::GetHost(Url) AS Utf8) AS Host, Ip FROM $a);
  24. $c = (SELECT Path, Host FROM $b WHERE Path IS NOT NULL AND Host IS NOT NULL AND Ip::IsIPv4(Ip::FromString(Ip)));
  25. $d = (SELECT Unwrap(Path) AS Path, Unwrap(Host) AS Host FROM $c);
  26. SELECT * FROM $d;
  27. )";
  28. int main(int argc, char** argv) {
  29. try {
  30. auto factory = MakeProgramFactory(
  31. TProgramFactoryOptions().SetUDFsDir(argc > 1 ? argv[1] : "../../../../udfs"));
  32. Cout << "Pull stream:" << Endl;
  33. PullStreamExample(factory);
  34. Cout << Endl;
  35. Cout << "Push stream:" << Endl;
  36. PushStreamExample(factory);
  37. Cout << Endl;
  38. Cout << "Pull stream with pre-compilation:" << Endl;
  39. PrecompileExample(factory);
  40. } catch (const TCompileError& err) {
  41. Cerr << err.GetIssues() << Endl;
  42. Cerr << err.what() << Endl;
  43. }
  44. }
  45. void PullStreamExample(IProgramFactoryPtr factory) {
  46. auto program = factory->MakePullStreamProgram(
  47. TProtobufInputSpec<TInput>(),
  48. TProtobufOutputSpec<TOutput>(),
  49. Query,
  50. ETranslationMode::SQL);
  51. auto result = program->Apply(MakeInput());
  52. while (auto* message = result->Fetch()) {
  53. Cout << "path = " << message->GetPath() << Endl;
  54. Cout << "host = " << message->GetHost() << Endl;
  55. }
  56. }
  57. void PushStreamExample(IProgramFactoryPtr factory) {
  58. auto program = factory->MakePushStreamProgram(
  59. TProtobufInputSpec<TInput>(),
  60. TProtobufOutputSpec<TOutput>(),
  61. Query,
  62. ETranslationMode::SQL);
  63. auto consumer = program->Apply(MakeHolder<TConsumer>());
  64. auto input = MakeInput();
  65. while (auto* message = input->Fetch()) {
  66. consumer->OnObject(message);
  67. }
  68. consumer->OnFinish();
  69. }
  70. void PrecompileExample(IProgramFactoryPtr factory) {
  71. TString prg;
  72. {
  73. auto program = factory->MakePullStreamProgram(
  74. TProtobufInputSpec<TInput>(),
  75. TProtobufOutputSpec<TOutput>(),
  76. Query,
  77. ETranslationMode::SQL);
  78. prg = program->GetCompiledProgram();
  79. }
  80. auto program = factory->MakePullStreamProgram(
  81. TProtobufInputSpec<TInput>(),
  82. TProtobufOutputSpec<TOutput>(),
  83. prg,
  84. ETranslationMode::Mkql);
  85. auto result = program->Apply(MakeInput());
  86. while (auto* message = result->Fetch()) {
  87. Cout << "path = " << message->GetPath() << Endl;
  88. Cout << "host = " << message->GetHost() << Endl;
  89. }
  90. }
  91. THolder<IStream<TInput*>> MakeInput() {
  92. TVector<TInput> input;
  93. {
  94. auto& message = input.emplace_back();
  95. message.SetUrl("https://news.yandex.ru/Moscow/index.html?from=index");
  96. message.SetIp("83.220.231.160");
  97. }
  98. {
  99. auto& message = input.emplace_back();
  100. message.SetUrl("https://music.yandex.ru/radio/");
  101. message.SetIp("83.220.231.161");
  102. }
  103. {
  104. auto& message = input.emplace_back();
  105. message.SetUrl("https://yandex.ru/maps/?ll=141.475401%2C11.581666&spn=1.757813%2C1.733096&z=7&l=map%2Cstv%2Csta&mode=search&panorama%5Bpoint%5D=141.476317%2C11.582710&panorama%5Bdirection%5D=177.241445%2C-15.219821&panorama%5Bspan%5D=107.410156%2C61.993317");
  106. message.SetIp("::ffff:77.75.155.3");
  107. }
  108. return StreamFromVector(std::move(input));
  109. }