impl.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. #include <yt/yql/plugin/bridge/interface.h>
  2. #include <yt/yql/plugin/native/plugin.h>
  3. #include <type_traits>
  4. using namespace NYT::NYqlPlugin;
  5. using namespace NYT::NYson;
  6. extern "C" {
  7. ////////////////////////////////////////////////////////////////////////////////
  8. TBridgeYqlPlugin* BridgeCreateYqlPlugin(const TBridgeYqlPluginOptions* bridgeOptions)
  9. {
  10. THashMap<TString, TString> clusters;
  11. for (auto clusterIndex = 0; clusterIndex < bridgeOptions->ClusterCount; ++clusterIndex) {
  12. const auto& Cluster = bridgeOptions->Clusters[clusterIndex];
  13. clusters[Cluster.Cluster] = Cluster.Proxy;
  14. }
  15. auto operationAttributes = bridgeOptions->OperationAttributes
  16. ? TYsonString(TString(bridgeOptions->OperationAttributes, bridgeOptions->OperationAttributesLength))
  17. : TYsonString();
  18. TYqlPluginOptions options{
  19. .MRJobBinary = TString(bridgeOptions->MRJobBinary),
  20. .UdfDirectory = TString(bridgeOptions->UdfDirectory),
  21. .Clusters = std::move(clusters),
  22. .DefaultCluster = std::optional<TString>(bridgeOptions->DefaultCluster),
  23. .OperationAttributes = operationAttributes,
  24. .MaxFilesSizeMb = static_cast<int>(bridgeOptions->MaxFilesSizeMb),
  25. .MaxFileCount = static_cast<int>(bridgeOptions->MaxFileCount),
  26. .DownloadFileRetryCount = static_cast<int>(bridgeOptions->DownloadFileRetryCount),
  27. .YTTokenPath = TString(bridgeOptions->YTTokenPath),
  28. .LogBackend = std::move(*reinterpret_cast<THolder<TLogBackend>*>(bridgeOptions->LogBackend)),
  29. };
  30. auto nativePlugin = CreateYqlPlugin(options);
  31. return nativePlugin.release();
  32. }
  33. void BridgeFreeYqlPlugin(TBridgeYqlPlugin* plugin)
  34. {
  35. auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
  36. delete nativePlugin;
  37. }
  38. void BridgeFreeQueryResult(TBridgeQueryResult* result)
  39. {
  40. delete result->TaskInfo;
  41. delete result->Statistics;
  42. delete result->Plan;
  43. delete result->YsonResult;
  44. delete result->YsonError;
  45. delete result;
  46. }
  47. void FillString(const char*& str, ssize_t& strLength, const std::optional<TString>& original)
  48. {
  49. if (!original) {
  50. str = nullptr;
  51. strLength = 0;
  52. return;
  53. }
  54. char* copy = new char[original->size() + 1];
  55. memcpy(copy, original->data(), original->size() + 1);
  56. str = copy;
  57. strLength = original->size();
  58. }
  59. TBridgeQueryResult* BridgeRun(TBridgeYqlPlugin* plugin, const char* queryId, const char* impersonationUser, const char* queryText, const char* settings, const TBridgeQueryFile* bridgeFiles, int bridgeFileCount)
  60. {
  61. static const auto EmptyMap = TYsonString(TString("{}"));
  62. auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
  63. auto* bridgeResult = new TBridgeQueryResult;
  64. std::vector<TQueryFile> files(bridgeFileCount);
  65. for (int index = 0; index < bridgeFileCount; index++) {
  66. const auto& file = bridgeFiles[index];
  67. files.push_back(TQueryFile {
  68. .Name = TStringBuf(file.Name, file.NameLength),
  69. .Content = TStringBuf(file.Content, file.ContentLength),
  70. .Type = file.Type,
  71. });
  72. }
  73. auto result = nativePlugin->Run(
  74. NYT::TGuid::FromString(queryId),
  75. TString(impersonationUser),
  76. TString(queryText),
  77. settings ? TYsonString(TString(settings)) : EmptyMap,
  78. files);
  79. FillString(bridgeResult->YsonResult, bridgeResult->YsonResultLength, result.YsonResult);
  80. FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
  81. FillString(bridgeResult->Statistics, bridgeResult->StatisticsLength, result.Statistics);
  82. FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress);
  83. FillString(bridgeResult->TaskInfo, bridgeResult->TaskInfoLength, result.TaskInfo);
  84. FillString(bridgeResult->YsonError, bridgeResult->YsonErrorLength, result.YsonError);
  85. return bridgeResult;
  86. }
  87. TBridgeQueryResult* BridgeGetProgress(TBridgeYqlPlugin* plugin, const char* queryId)
  88. {
  89. auto* nativePlugin = reinterpret_cast<IYqlPlugin*>(plugin);
  90. auto* bridgeResult = new TBridgeQueryResult;
  91. auto result = nativePlugin->GetProgress(NYT::TGuid::FromString(queryId));
  92. FillString(bridgeResult->Plan, bridgeResult->PlanLength, result.Plan);
  93. FillString(bridgeResult->Progress, bridgeResult->ProgressLength, result.Progress);
  94. return bridgeResult;
  95. }
  96. ////////////////////////////////////////////////////////////////////////////////
  97. // Validate that the all functions from the bridge interface are implemented with proper signatures.
  98. #define XX(function) static_assert(std::is_same_v<decltype(&(function)), TFunc ## function*>);
  99. FOR_EACH_BRIDGE_INTERFACE_FUNCTION(XX)
  100. #undef XX
  101. ////////////////////////////////////////////////////////////////////////////////
  102. } // extern "C"