spec.cpp 43 KB


  1. #include "proto_holder.h"
  2. #include "spec.h"
  3. #include <yql/essentials/public/udf/udf_value.h>
  4. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  5. #include <yql/essentials/minikql/computation/mkql_custom_list.h>
  6. #include <yql/essentials/minikql/mkql_string_util.h>
  7. #include <yql/essentials/utils/yql_panic.h>
  8. #include <google/protobuf/reflection.h>
  9. using namespace NYql;
  10. using namespace NYql::NPureCalc;
  11. using namespace google::protobuf;
  12. using namespace NKikimr::NUdf;
  13. using namespace NKikimr::NMiniKQL;
  14. TProtobufRawInputSpec::TProtobufRawInputSpec(
  15. const Descriptor& descriptor,
  16. const TMaybe<TString>& timestampColumn,
  17. const TProtoSchemaOptions& options
  18. )
  19. : Descriptor_(descriptor)
  20. , TimestampColumn_(timestampColumn)
  21. , SchemaOptions_(options)
  22. {
  23. }
  24. const TVector<NYT::TNode>& TProtobufRawInputSpec::GetSchemas() const {
  25. if (SavedSchemas_.size() == 0) {
  26. SavedSchemas_.push_back(MakeSchemaFromProto(Descriptor_, SchemaOptions_));
  27. if (TimestampColumn_) {
  28. auto timestampType = NYT::TNode::CreateList();
  29. timestampType.Add("DataType");
  30. timestampType.Add("Uint64");
  31. auto timestamp = NYT::TNode::CreateList();
  32. timestamp.Add(*TimestampColumn_);
  33. timestamp.Add(timestampType);
  34. SavedSchemas_.back().AsList()[1].AsList().push_back(timestamp);
  35. }
  36. }
  37. return SavedSchemas_;
  38. }
  39. const Descriptor& TProtobufRawInputSpec::GetDescriptor() const {
  40. return Descriptor_;
  41. }
  42. const TMaybe<TString>& TProtobufRawInputSpec::GetTimestampColumn() const {
  43. return TimestampColumn_;
  44. }
  45. const TProtoSchemaOptions& TProtobufRawInputSpec::GetSchemaOptions() const {
  46. return SchemaOptions_;
  47. }
  48. TProtobufRawOutputSpec::TProtobufRawOutputSpec(
  49. const Descriptor& descriptor,
  50. MessageFactory* factory,
  51. const TProtoSchemaOptions& options,
  52. Arena* arena
  53. )
  54. : Descriptor_(descriptor)
  55. , Factory_(factory)
  56. , SchemaOptions_(options)
  57. , Arena_(arena)
  58. {
  59. SchemaOptions_.ListIsOptional = true;
  60. }
  61. const NYT::TNode& TProtobufRawOutputSpec::GetSchema() const {
  62. if (!SavedSchema_) {
  63. SavedSchema_ = MakeSchemaFromProto(Descriptor_, SchemaOptions_);
  64. }
  65. return SavedSchema_.GetRef();
  66. }
  67. const Descriptor& TProtobufRawOutputSpec::GetDescriptor() const {
  68. return Descriptor_;
  69. }
  70. void TProtobufRawOutputSpec::SetFactory(MessageFactory* factory) {
  71. Factory_ = factory;
  72. }
  73. MessageFactory* TProtobufRawOutputSpec::GetFactory() const {
  74. return Factory_;
  75. }
  76. void TProtobufRawOutputSpec::SetArena(Arena* arena) {
  77. Arena_ = arena;
  78. }
  79. Arena* TProtobufRawOutputSpec::GetArena() const {
  80. return Arena_;
  81. }
  82. const TProtoSchemaOptions& TProtobufRawOutputSpec::GetSchemaOptions() const {
  83. return SchemaOptions_;
  84. }
  85. TProtobufRawMultiOutputSpec::TProtobufRawMultiOutputSpec(
  86. TVector<const Descriptor*> descriptors,
  87. TMaybe<TVector<MessageFactory*>> factories,
  88. const TProtoSchemaOptions& options,
  89. TMaybe<TVector<Arena*>> arenas
  90. )
  91. : Descriptors_(std::move(descriptors))
  92. , SchemaOptions_(options)
  93. {
  94. if (factories) {
  95. Y_ENSURE(factories->size() == Descriptors_.size(), "number of factories must match number of descriptors");
  96. Factories_ = std::move(*factories);
  97. } else {
  98. Factories_ = TVector<MessageFactory*>(Descriptors_.size(), nullptr);
  99. }
  100. if (arenas) {
  101. Y_ENSURE(arenas->size() == Descriptors_.size(), "number of arenas must match number of descriptors");
  102. Arenas_ = std::move(*arenas);
  103. } else {
  104. Arenas_ = TVector<Arena*>(Descriptors_.size(), nullptr);
  105. }
  106. }
  107. const NYT::TNode& TProtobufRawMultiOutputSpec::GetSchema() const {
  108. if (SavedSchema_.IsUndefined()) {
  109. SavedSchema_ = MakeVariantSchemaFromProtos(Descriptors_, SchemaOptions_);
  110. }
  111. return SavedSchema_;
  112. }
  113. const Descriptor& TProtobufRawMultiOutputSpec::GetDescriptor(ui32 index) const {
  114. Y_ENSURE(index < Descriptors_.size(), "invalid output index");
  115. return *Descriptors_[index];
  116. }
  117. void TProtobufRawMultiOutputSpec::SetFactory(ui32 index, MessageFactory* factory) {
  118. Y_ENSURE(index < Factories_.size(), "invalid output index");
  119. Factories_[index] = factory;
  120. }
  121. MessageFactory* TProtobufRawMultiOutputSpec::GetFactory(ui32 index) const {
  122. Y_ENSURE(index < Factories_.size(), "invalid output index");
  123. return Factories_[index];
  124. }
  125. void TProtobufRawMultiOutputSpec::SetArena(ui32 index, Arena* arena) {
  126. Y_ENSURE(index < Arenas_.size(), "invalid output index");
  127. Arenas_[index] = arena;
  128. }
  129. Arena* TProtobufRawMultiOutputSpec::GetArena(ui32 index) const {
  130. Y_ENSURE(index < Arenas_.size(), "invalid output index");
  131. return Arenas_[index];
  132. }
  133. ui32 TProtobufRawMultiOutputSpec::GetOutputsNumber() const {
  134. return static_cast<ui32>(Descriptors_.size());
  135. }
  136. const TProtoSchemaOptions& TProtobufRawMultiOutputSpec::GetSchemaOptions() const {
  137. return SchemaOptions_;
  138. }
  139. namespace {
  140. struct TFieldMapping {
  141. TString Name;
  142. const FieldDescriptor* Field;
  143. TVector<TFieldMapping> NestedFields;
  144. };
  145. /**
  146. * Fills a tree of field mappings from the given yql struct type to protobuf message.
  147. *
  148. * @param fromType source yql type.
  149. * @param toType target protobuf message type.
  150. * @param mappings destination vector will be filled with field descriptors. Order of descriptors will match
  151. * the order of field names.
  152. */
  153. void FillFieldMappings(
  154. const TStructType* fromType,
  155. const Descriptor& toType,
  156. TVector<TFieldMapping>& mappings,
  157. const TMaybe<TString>& timestampColumn,
  158. bool listIsOptional,
  159. const THashMap<TString, TString>& fieldRenames
  160. ) {
  161. THashMap<TString, TString> inverseFieldRenames;
  162. for (const auto& [source, target]: fieldRenames) {
  163. auto [iterator, emplaced] = inverseFieldRenames.emplace(target, source);
  164. Y_ENSURE(emplaced, "Duplicate rename field found: " << source << " -> " << target);
  165. }
  166. mappings.resize(fromType->GetMembersCount());
  167. for (ui32 i = 0; i < fromType->GetMembersCount(); ++i) {
  168. TString fieldName(fromType->GetMemberName(i));
  169. if (auto fieldRenamePtr = inverseFieldRenames.FindPtr(fieldName)) {
  170. fieldName = *fieldRenamePtr;
  171. }
  172. mappings[i].Name = fieldName;
  173. mappings[i].Field = toType.FindFieldByName(fieldName);
  174. YQL_ENSURE(
  175. mappings[i].Field || timestampColumn && *timestampColumn == fieldName,
  176. "Missing field: " << fieldName);
  177. const auto* fieldType = fromType->GetMemberType(i);
  178. if (fieldType->GetKind() == NKikimr::NMiniKQL::TType::EKind::List) {
  179. const auto* listType = static_cast<const NKikimr::NMiniKQL::TListType*>(fieldType);
  180. fieldType = listType->GetItemType();
  181. } else if (fieldType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Optional) {
  182. const auto* optionalType = static_cast<const NKikimr::NMiniKQL::TOptionalType*>(fieldType);
  183. fieldType = optionalType->GetItemType();
  184. if (listIsOptional) {
  185. if (fieldType->GetKind() == NKikimr::NMiniKQL::TType::EKind::List) {
  186. const auto* listType = static_cast<const NKikimr::NMiniKQL::TListType*>(fieldType);
  187. fieldType = listType->GetItemType();
  188. }
  189. }
  190. }
  191. YQL_ENSURE(fieldType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct ||
  192. fieldType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Data,
  193. "unsupported field kind [" << fieldType->GetKindAsStr() << "], field [" << fieldName << "]");
  194. if (fieldType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct) {
  195. FillFieldMappings(static_cast<const NKikimr::NMiniKQL::TStructType*>(fieldType),
  196. *mappings[i].Field->message_type(),
  197. mappings[i].NestedFields, Nothing(), listIsOptional, {});
  198. }
  199. }
  200. }
  201. /**
  202. * Extract field values from the given protobuf message into an array of unboxed values.
  203. *
  204. * @param factory to create nested unboxed values.
  205. * @param source source protobuf message.
  206. * @param destination destination array of unboxed values. Each element in the array corresponds to a field
  207. * in the protobuf message.
  208. * @param mappings vector of protobuf field descriptors which denotes relation between fields of the
  209. * source message and elements of the destination array.
  210. * @param scratch temporary string which will be used during conversion.
  211. */
  212. void FillInputValue(
  213. const THolderFactory& factory,
  214. const Message* source,
  215. TUnboxedValue* destination,
  216. const TVector<TFieldMapping>& mappings,
  217. const TMaybe<TString>& timestampColumn,
  218. ITimeProvider* timeProvider,
  219. EEnumPolicy enumPolicy
  220. ) {
  221. TString scratch;
  222. auto reflection = source->GetReflection();
  223. for (ui32 i = 0; i < mappings.size(); ++i) {
  224. auto mapping = mappings[i];
  225. if (!mapping.Field) {
  226. YQL_ENSURE(timestampColumn && mapping.Name == *timestampColumn);
  227. destination[i] = TUnboxedValuePod(timeProvider->Now().MicroSeconds());
  228. continue;
  229. }
  230. const auto type = mapping.Field->type();
  231. if (mapping.Field->label() == FieldDescriptor::LABEL_REPEATED) {
  232. const auto size = static_cast<ui32>(reflection->FieldSize(*source, mapping.Field));
  233. if (size == 0) {
  234. destination[i] = factory.GetEmptyContainerLazy();
  235. } else {
  236. TUnboxedValue* inplace = nullptr;
  237. destination[i] = factory.CreateDirectArrayHolder(size, inplace);
  238. for (ui32 j = 0; j < size; ++j) {
  239. switch (type) {
  240. case FieldDescriptor::TYPE_DOUBLE:
  241. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedDouble(*source, mapping.Field, j));
  242. break;
  243. case FieldDescriptor::TYPE_FLOAT:
  244. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedFloat(*source, mapping.Field, j));
  245. break;
  246. case FieldDescriptor::TYPE_INT64:
  247. case FieldDescriptor::TYPE_SFIXED64:
  248. case FieldDescriptor::TYPE_SINT64:
  249. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedInt64(*source, mapping.Field, j));
  250. break;
  251. case FieldDescriptor::TYPE_ENUM:
  252. switch (EnumFormatType(*mapping.Field, enumPolicy)) {
  253. case EEnumFormatType::Int32:
  254. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedEnumValue(*source, mapping.Field, j));
  255. break;
  256. case EEnumFormatType::String:
  257. inplace[j] = MakeString(reflection->GetRepeatedEnum(*source, mapping.Field, j)->name());
  258. break;
  259. }
  260. break;
  261. case FieldDescriptor::TYPE_UINT64:
  262. case FieldDescriptor::TYPE_FIXED64:
  263. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedUInt64(*source, mapping.Field, j));
  264. break;
  265. case FieldDescriptor::TYPE_INT32:
  266. case FieldDescriptor::TYPE_SFIXED32:
  267. case FieldDescriptor::TYPE_SINT32:
  268. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedInt32(*source, mapping.Field, j));
  269. break;
  270. case FieldDescriptor::TYPE_UINT32:
  271. case FieldDescriptor::TYPE_FIXED32:
  272. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedUInt32(*source, mapping.Field, j));
  273. break;
  274. case FieldDescriptor::TYPE_BOOL:
  275. inplace[j] = TUnboxedValuePod(reflection->GetRepeatedBool(*source, mapping.Field, j));
  276. break;
  277. case FieldDescriptor::TYPE_STRING:
  278. inplace[j] = MakeString(reflection->GetRepeatedStringReference(*source, mapping.Field, j, &scratch));
  279. break;
  280. case FieldDescriptor::TYPE_BYTES:
  281. inplace[j] = MakeString(reflection->GetRepeatedStringReference(*source, mapping.Field, j, &scratch));
  282. break;
  283. case FieldDescriptor::TYPE_MESSAGE:
  284. {
  285. const Message& nestedMessage = reflection->GetRepeatedMessage(*source, mapping.Field, j);
  286. TUnboxedValue* nestedValues = nullptr;
  287. inplace[j] = factory.CreateDirectArrayHolder(static_cast<ui32>(mapping.NestedFields.size()),
  288. nestedValues);
  289. FillInputValue(factory, &nestedMessage, nestedValues, mapping.NestedFields, Nothing(), timeProvider, enumPolicy);
  290. }
  291. break;
  292. default:
  293. ythrow yexception() << "Unsupported protobuf type: " << mapping.Field->type_name() << ", field: " << mapping.Field->name();
  294. }
  295. }
  296. }
  297. } else {
  298. if (!reflection->HasField(*source, mapping.Field)) {
  299. continue;
  300. }
  301. switch (type) {
  302. case FieldDescriptor::TYPE_DOUBLE:
  303. destination[i] = TUnboxedValuePod(reflection->GetDouble(*source, mapping.Field));
  304. break;
  305. case FieldDescriptor::TYPE_FLOAT:
  306. destination[i] = TUnboxedValuePod(reflection->GetFloat(*source, mapping.Field));
  307. break;
  308. case FieldDescriptor::TYPE_INT64:
  309. case FieldDescriptor::TYPE_SFIXED64:
  310. case FieldDescriptor::TYPE_SINT64:
  311. destination[i] = TUnboxedValuePod(reflection->GetInt64(*source, mapping.Field));
  312. break;
  313. case FieldDescriptor::TYPE_ENUM:
  314. switch (EnumFormatType(*mapping.Field, enumPolicy)) {
  315. case EEnumFormatType::Int32:
  316. destination[i] = TUnboxedValuePod(reflection->GetEnumValue(*source, mapping.Field));
  317. break;
  318. case EEnumFormatType::String:
  319. destination[i] = MakeString(reflection->GetEnum(*source, mapping.Field)->name());
  320. break;
  321. }
  322. break;
  323. case FieldDescriptor::TYPE_UINT64:
  324. case FieldDescriptor::TYPE_FIXED64:
  325. destination[i] = TUnboxedValuePod(reflection->GetUInt64(*source, mapping.Field));
  326. break;
  327. case FieldDescriptor::TYPE_INT32:
  328. case FieldDescriptor::TYPE_SFIXED32:
  329. case FieldDescriptor::TYPE_SINT32:
  330. destination[i] = TUnboxedValuePod(reflection->GetInt32(*source, mapping.Field));
  331. break;
  332. case FieldDescriptor::TYPE_UINT32:
  333. case FieldDescriptor::TYPE_FIXED32:
  334. destination[i] = TUnboxedValuePod(reflection->GetUInt32(*source, mapping.Field));
  335. break;
  336. case FieldDescriptor::TYPE_BOOL:
  337. destination[i] = TUnboxedValuePod(reflection->GetBool(*source, mapping.Field));
  338. break;
  339. case FieldDescriptor::TYPE_STRING:
  340. destination[i] = MakeString(reflection->GetStringReference(*source, mapping.Field, &scratch));
  341. break;
  342. case FieldDescriptor::TYPE_BYTES:
  343. destination[i] = MakeString(reflection->GetStringReference(*source, mapping.Field, &scratch));
  344. break;
  345. case FieldDescriptor::TYPE_MESSAGE:
  346. {
  347. const Message& nestedMessage = reflection->GetMessage(*source, mapping.Field);
  348. TUnboxedValue* nestedValues = nullptr;
  349. destination[i] = factory.CreateDirectArrayHolder(static_cast<ui32>(mapping.NestedFields.size()),
  350. nestedValues);
  351. FillInputValue(factory, &nestedMessage, nestedValues, mapping.NestedFields, Nothing(), timeProvider, enumPolicy);
  352. }
  353. break;
  354. default:
  355. ythrow yexception() << "Unsupported protobuf type: " << mapping.Field->type_name()
  356. << ", field: " << mapping.Field->name();
  357. }
  358. }
  359. }
  360. }
  361. /**
  362. * Convert unboxed value to protobuf.
  363. *
  364. * @param source unboxed value to extract data from. Type of the value should be struct. It's UB to pass
  365. * a non-struct value here.
  366. * @param destination destination message. Data in this message will be overwritten
  367. * by data from unboxed value.
  368. * @param mappings vector of protobuf field descriptors which denotes relation between struct fields
  369. * and message fields. For any i-th element of this vector, type of the i-th element of
  370. * the unboxed structure must match type of the field pointed by descriptor. Size of this
  371. * vector should match the number of fields in the struct.
  372. */
  373. void FillOutputMessage(
  374. const TUnboxedValue& source,
  375. Message* destination,
  376. const TVector<TFieldMapping>& mappings,
  377. EEnumPolicy enumPolicy
  378. ) {
  379. auto reflection = destination->GetReflection();
  380. for (ui32 i = 0; i < mappings.size(); ++i) {
  381. const auto& mapping = mappings[i];
  382. const auto& cell = source.GetElement(i);
  383. if (!cell) {
  384. reflection->ClearField(destination, mapping.Field);
  385. continue;
  386. }
  387. const auto type = mapping.Field->type();
  388. if (mapping.Field->label() == FieldDescriptor::LABEL_REPEATED) {
  389. const auto iter = cell.GetListIterator();
  390. reflection->ClearField(destination, mapping.Field);
  391. for (TUnboxedValue item; iter.Next(item);) {
  392. switch (mapping.Field->type()) {
  393. case FieldDescriptor::TYPE_DOUBLE:
  394. reflection->AddDouble(destination, mapping.Field, item.Get<double>());
  395. break;
  396. case FieldDescriptor::TYPE_FLOAT:
  397. reflection->AddFloat(destination, mapping.Field, item.Get<float>());
  398. break;
  399. case FieldDescriptor::TYPE_INT64:
  400. case FieldDescriptor::TYPE_SFIXED64:
  401. case FieldDescriptor::TYPE_SINT64:
  402. reflection->AddInt64(destination, mapping.Field, item.Get<i64>());
  403. break;
  404. case FieldDescriptor::TYPE_ENUM: {
  405. switch (EnumFormatType(*mapping.Field, enumPolicy)) {
  406. case EEnumFormatType::Int32:
  407. reflection->AddEnumValue(destination, mapping.Field, item.Get<i32>());
  408. break;
  409. case EEnumFormatType::String: {
  410. auto enumValueDescriptor = mapping.Field->enum_type()->FindValueByName(TString(item.AsStringRef()));
  411. if (!enumValueDescriptor) {
  412. enumValueDescriptor = mapping.Field->default_value_enum();
  413. }
  414. reflection->AddEnum(destination, mapping.Field, enumValueDescriptor);
  415. break;
  416. }
  417. }
  418. break;
  419. }
  420. case FieldDescriptor::TYPE_UINT64:
  421. case FieldDescriptor::TYPE_FIXED64:
  422. reflection->AddUInt64(destination, mapping.Field, item.Get<ui64>());
  423. break;
  424. case FieldDescriptor::TYPE_INT32:
  425. case FieldDescriptor::TYPE_SFIXED32:
  426. case FieldDescriptor::TYPE_SINT32:
  427. reflection->AddInt32(destination, mapping.Field, item.Get<i32>());
  428. break;
  429. case FieldDescriptor::TYPE_UINT32:
  430. case FieldDescriptor::TYPE_FIXED32:
  431. reflection->AddUInt32(destination, mapping.Field, item.Get<ui32>());
  432. break;
  433. case FieldDescriptor::TYPE_BOOL:
  434. reflection->AddBool(destination, mapping.Field, item.Get<bool>());
  435. break;
  436. case FieldDescriptor::TYPE_STRING:
  437. reflection->AddString(destination, mapping.Field, TString(item.AsStringRef()));
  438. break;
  439. case FieldDescriptor::TYPE_BYTES:
  440. reflection->AddString(destination, mapping.Field, TString(item.AsStringRef()));
  441. break;
  442. case FieldDescriptor::TYPE_MESSAGE:
  443. {
  444. auto* nestedMessage = reflection->AddMessage(destination, mapping.Field);
  445. FillOutputMessage(item, nestedMessage, mapping.NestedFields, enumPolicy);
  446. }
  447. break;
  448. default:
  449. ythrow yexception() << "Unsupported protobuf type: "
  450. << mapping.Field->type_name() << ", field: " << mapping.Field->name();
  451. }
  452. }
  453. } else {
  454. switch (type) {
  455. case FieldDescriptor::TYPE_DOUBLE:
  456. reflection->SetDouble(destination, mapping.Field, cell.Get<double>());
  457. break;
  458. case FieldDescriptor::TYPE_FLOAT:
  459. reflection->SetFloat(destination, mapping.Field, cell.Get<float>());
  460. break;
  461. case FieldDescriptor::TYPE_INT64:
  462. case FieldDescriptor::TYPE_SFIXED64:
  463. case FieldDescriptor::TYPE_SINT64:
  464. reflection->SetInt64(destination, mapping.Field, cell.Get<i64>());
  465. break;
  466. case FieldDescriptor::TYPE_ENUM: {
  467. switch (EnumFormatType(*mapping.Field, enumPolicy)) {
  468. case EEnumFormatType::Int32:
  469. reflection->SetEnumValue(destination, mapping.Field, cell.Get<i32>());
  470. break;
  471. case EEnumFormatType::String: {
  472. auto enumValueDescriptor = mapping.Field->enum_type()->FindValueByName(TString(cell.AsStringRef()));
  473. if (!enumValueDescriptor) {
  474. enumValueDescriptor = mapping.Field->default_value_enum();
  475. }
  476. reflection->SetEnum(destination, mapping.Field, enumValueDescriptor);
  477. break;
  478. }
  479. }
  480. break;
  481. }
  482. case FieldDescriptor::TYPE_UINT64:
  483. case FieldDescriptor::TYPE_FIXED64:
  484. reflection->SetUInt64(destination, mapping.Field, cell.Get<ui64>());
  485. break;
  486. case FieldDescriptor::TYPE_INT32:
  487. case FieldDescriptor::TYPE_SFIXED32:
  488. case FieldDescriptor::TYPE_SINT32:
  489. reflection->SetInt32(destination, mapping.Field, cell.Get<i32>());
  490. break;
  491. case FieldDescriptor::TYPE_UINT32:
  492. case FieldDescriptor::TYPE_FIXED32:
  493. reflection->SetUInt32(destination, mapping.Field, cell.Get<ui32>());
  494. break;
  495. case FieldDescriptor::TYPE_BOOL:
  496. reflection->SetBool(destination, mapping.Field, cell.Get<bool>());
  497. break;
  498. case FieldDescriptor::TYPE_STRING:
  499. reflection->SetString(destination, mapping.Field, TString(cell.AsStringRef()));
  500. break;
  501. case FieldDescriptor::TYPE_BYTES:
  502. reflection->SetString(destination, mapping.Field, TString(cell.AsStringRef()));
  503. break;
  504. case FieldDescriptor::TYPE_MESSAGE:
  505. {
  506. auto* nestedMessage = reflection->MutableMessage(destination, mapping.Field);
  507. FillOutputMessage(cell, nestedMessage, mapping.NestedFields, enumPolicy);
  508. }
  509. break;
  510. default:
  511. ythrow yexception() << "Unsupported protobuf type: "
  512. << mapping.Field->type_name() << ", field: " << mapping.Field->name();
  513. }
  514. }
  515. }
  516. }
  517. /**
  518. * Converts input messages to unboxed values.
  519. */
  520. class TInputConverter {
  521. protected:
  522. IWorker* Worker_;
  523. TVector<TFieldMapping> Mappings_;
  524. TPlainContainerCache Cache_;
  525. TMaybe<TString> TimestampColumn_;
  526. EEnumPolicy EnumPolicy_ = EEnumPolicy::Int32;
  527. public:
  528. explicit TInputConverter(const TProtobufRawInputSpec& inputSpec, IWorker* worker)
  529. : Worker_(worker)
  530. , TimestampColumn_(inputSpec.GetTimestampColumn())
  531. , EnumPolicy_(inputSpec.GetSchemaOptions().EnumPolicy)
  532. {
  533. FillFieldMappings(
  534. Worker_->GetInputType(), inputSpec.GetDescriptor(),
  535. Mappings_, TimestampColumn_,
  536. inputSpec.GetSchemaOptions().ListIsOptional,
  537. inputSpec.GetSchemaOptions().FieldRenames
  538. );
  539. }
  540. public:
  541. void DoConvert(const Message* message, TUnboxedValue& result) {
  542. auto& holderFactory = Worker_->GetGraph().GetHolderFactory();
  543. TUnboxedValue* items = nullptr;
  544. result = Cache_.NewArray(holderFactory, static_cast<ui32>(Mappings_.size()), items);
  545. FillInputValue(holderFactory, message, items, Mappings_, TimestampColumn_, Worker_->GetTimeProvider(), EnumPolicy_);
  546. }
  547. void ClearCache() {
  548. Cache_.Clear();
  549. }
  550. };
  551. template <typename TOutputSpec>
  552. using OutputItemType = typename TOutputSpecTraits<TOutputSpec>::TOutputItemType;
  553. template <typename TOutputSpec>
  554. class TOutputConverter;
  555. /**
  556. * Converts unboxed values to output messages (single-output program case).
  557. */
  558. template <>
  559. class TOutputConverter<TProtobufRawOutputSpec> {
  560. protected:
  561. IWorker* Worker_;
  562. TVector<TFieldMapping> OutputColumns_;
  563. TProtoHolder<Message> Message_;
  564. EEnumPolicy EnumPolicy_ = EEnumPolicy::Int32;
  565. public:
  566. explicit TOutputConverter(const TProtobufRawOutputSpec& outputSpec, IWorker* worker)
  567. : Worker_(worker)
  568. , EnumPolicy_(outputSpec.GetSchemaOptions().EnumPolicy)
  569. {
  570. if (!Worker_->GetOutputType()->IsStruct()) {
  571. ythrow yexception() << "protobuf output spec does not support multiple outputs";
  572. }
  573. FillFieldMappings(
  574. static_cast<const NKikimr::NMiniKQL::TStructType*>(Worker_->GetOutputType()),
  575. outputSpec.GetDescriptor(),
  576. OutputColumns_,
  577. Nothing(),
  578. outputSpec.GetSchemaOptions().ListIsOptional,
  579. outputSpec.GetSchemaOptions().FieldRenames
  580. );
  581. auto* factory = outputSpec.GetFactory();
  582. if (!factory) {
  583. factory = MessageFactory::generated_factory();
  584. }
  585. Message_.Reset(factory->GetPrototype(&outputSpec.GetDescriptor())->New(outputSpec.GetArena()));
  586. }
  587. OutputItemType<TProtobufRawOutputSpec> DoConvert(TUnboxedValue value) {
  588. FillOutputMessage(value, Message_.Get(), OutputColumns_, EnumPolicy_);
  589. return Message_.Get();
  590. }
  591. };
  592. /*
  593. * Converts unboxed values to output type (multi-output programs case).
  594. */
  595. template <>
  596. class TOutputConverter<TProtobufRawMultiOutputSpec> {
  597. protected:
  598. IWorker* Worker_;
  599. TVector<TVector<TFieldMapping>> OutputColumns_;
  600. TVector<TProtoHolder<Message>> Messages_;
  601. EEnumPolicy EnumPolicy_ = EEnumPolicy::Int32;
  602. public:
  603. explicit TOutputConverter(const TProtobufRawMultiOutputSpec& outputSpec, IWorker* worker)
  604. : Worker_(worker)
  605. , EnumPolicy_(outputSpec.GetSchemaOptions().EnumPolicy)
  606. {
  607. const auto* outputType = Worker_->GetOutputType();
  608. Y_ENSURE(outputType->IsVariant(), "protobuf multi-output spec requires multi-output program");
  609. const auto* variantType = static_cast<const NKikimr::NMiniKQL::TVariantType*>(outputType);
  610. Y_ENSURE(
  611. variantType->GetUnderlyingType()->IsTuple(),
  612. "protobuf multi-output spec requires variant over tuple as program output type"
  613. );
  614. Y_ENSURE(
  615. outputSpec.GetOutputsNumber() == variantType->GetAlternativesCount(),
  616. "number of outputs provided by spec does not match number of variant alternatives"
  617. );
  618. auto defaultFactory = MessageFactory::generated_factory();
  619. for (ui32 i = 0; i < variantType->GetAlternativesCount(); ++i) {
  620. const auto* type = variantType->GetAlternativeType(i);
  621. Y_ASSERT(type->IsStruct());
  622. Y_ASSERT(OutputColumns_.size() == i && Messages_.size() == i);
  623. OutputColumns_.push_back({});
  624. FillFieldMappings(
  625. static_cast<const NKikimr::NMiniKQL::TStructType*>(type),
  626. outputSpec.GetDescriptor(i),
  627. OutputColumns_.back(),
  628. Nothing(),
  629. outputSpec.GetSchemaOptions().ListIsOptional,
  630. {}
  631. );
  632. auto factory = outputSpec.GetFactory(i);
  633. if (!factory) {
  634. factory = defaultFactory;
  635. }
  636. Messages_.push_back(TProtoHolder<Message>(
  637. factory->GetPrototype(&outputSpec.GetDescriptor(i))->New(outputSpec.GetArena(i))
  638. ));
  639. }
  640. }
  641. OutputItemType<TProtobufRawMultiOutputSpec> DoConvert(TUnboxedValue value) {
  642. auto index = value.GetVariantIndex();
  643. auto msgPtr = Messages_[index].Get();
  644. FillOutputMessage(value.GetVariantItem(), msgPtr, OutputColumns_[index], EnumPolicy_);
  645. return {index, msgPtr};
  646. }
  647. };
  648. /**
  649. * List (or, better, stream) of unboxed values. Used as an input value in pull workers.
  650. */
  651. class TProtoListValue final: public TCustomListValue {
  652. private:
  653. mutable bool HasIterator_ = false;
  654. THolder<IStream<Message*>> Underlying_;
  655. TInputConverter Converter_;
  656. IWorker* Worker_;
  657. TScopedAlloc& ScopedAlloc_;
  658. public:
  659. TProtoListValue(
  660. TMemoryUsageInfo* memInfo,
  661. const TProtobufRawInputSpec& inputSpec,
  662. THolder<IStream<Message*>> underlying,
  663. IWorker* worker
  664. )
  665. : TCustomListValue(memInfo)
  666. , Underlying_(std::move(underlying))
  667. , Converter_(inputSpec, worker)
  668. , Worker_(worker)
  669. , ScopedAlloc_(Worker_->GetScopedAlloc())
  670. {
  671. }
  672. ~TProtoListValue() override {
  673. {
  674. // This list value stored in the worker's computation graph and destroyed upon the computation
  675. // graph's destruction. This brings us to an interesting situation: scoped alloc is acquired,
  676. // worker and computation graph are half-way destroyed, and now it's our turn to die. The problem is,
  677. // the underlying stream may own another worker. This happens when chaining programs. Now, to destroy
  678. // that worker correctly, we need to release our scoped alloc (because that worker has its own
  679. // computation graph and scoped alloc).
  680. // By the way, note that we shouldn't interact with the worker here because worker is in the middle of
  681. // its own destruction. So we're using our own reference to the scoped alloc. That reference is alive
  682. // because scoped alloc destroyed after computation graph.
  683. auto unguard = Unguard(ScopedAlloc_);
  684. Underlying_.Destroy();
  685. }
  686. }
  687. public:
  688. TUnboxedValue GetListIterator() const override {
  689. YQL_ENSURE(!HasIterator_, "Only one pass over input is supported");
  690. HasIterator_ = true;
  691. return TUnboxedValuePod(const_cast<TProtoListValue*>(this));
  692. }
  693. bool Next(TUnboxedValue& result) override {
  694. const Message* message;
  695. {
  696. auto unguard = Unguard(ScopedAlloc_);
  697. message = Underlying_->Fetch();
  698. }
  699. if (!message) {
  700. return false;
  701. }
  702. Converter_.DoConvert(message, result);
  703. return true;
  704. }
  705. EFetchStatus Fetch(TUnboxedValue& result) override {
  706. if (Next(result)) {
  707. return EFetchStatus::Ok;
  708. } else {
  709. return EFetchStatus::Finish;
  710. }
  711. }
  712. };
  713. /**
  714. * Consumer which converts messages to unboxed values and relays them to the worker. Used as a return value
  715. * of the push processor's Process function.
  716. */
  717. class TProtoConsumerImpl final: public IConsumer<Message*> {
  718. private:
  719. TWorkerHolder<IPushStreamWorker> WorkerHolder_;
  720. TInputConverter Converter_;
  721. public:
  722. explicit TProtoConsumerImpl(
  723. const TProtobufRawInputSpec& inputSpec,
  724. TWorkerHolder<IPushStreamWorker> worker
  725. )
  726. : WorkerHolder_(std::move(worker))
  727. , Converter_(inputSpec, WorkerHolder_.Get())
  728. {
  729. }
  730. ~TProtoConsumerImpl() override {
  731. with_lock(WorkerHolder_->GetScopedAlloc()) {
  732. Converter_.ClearCache();
  733. }
  734. }
  735. public:
  736. void OnObject(Message* message) override {
  737. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  738. with_lock(WorkerHolder_->GetScopedAlloc()) {
  739. TUnboxedValue result;
  740. Converter_.DoConvert(message, result);
  741. WorkerHolder_->Push(std::move(result));
  742. }
  743. }
  744. void OnFinish() override {
  745. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  746. with_lock(WorkerHolder_->GetScopedAlloc()) {
  747. WorkerHolder_->OnFinish();
  748. }
  749. }
  750. };
  751. /**
  752. * Protobuf input stream for unboxed value streams.
  753. */
  754. template <typename TOutputSpec>
  755. class TRawProtoStreamImpl final: public IStream<OutputItemType<TOutputSpec>> {
  756. protected:
  757. TWorkerHolder<IPullStreamWorker> WorkerHolder_;
  758. TOutputConverter<TOutputSpec> Converter_;
  759. public:
  760. explicit TRawProtoStreamImpl(const TOutputSpec& outputSpec, TWorkerHolder<IPullStreamWorker> worker)
  761. : WorkerHolder_(std::move(worker))
  762. , Converter_(outputSpec, WorkerHolder_.Get())
  763. {
  764. }
  765. public:
  766. OutputItemType<TOutputSpec> Fetch() override {
  767. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  768. with_lock(WorkerHolder_->GetScopedAlloc()) {
  769. TUnboxedValue value;
  770. auto status = WorkerHolder_->GetOutput().Fetch(value);
  771. YQL_ENSURE(status != EFetchStatus::Yield, "Yield is not supported in pull mode");
  772. if (status == EFetchStatus::Finish) {
  773. return TOutputSpecTraits<TOutputSpec>::StreamSentinel;
  774. }
  775. return Converter_.DoConvert(value);
  776. }
  777. }
  778. };
  779. /**
  780. * Protobuf input stream for unboxed value lists.
  781. */
  782. template <typename TOutputSpec>
  783. class TRawProtoListImpl final: public IStream<OutputItemType<TOutputSpec>> {
  784. protected:
  785. TWorkerHolder<IPullListWorker> WorkerHolder_;
  786. TOutputConverter<TOutputSpec> Converter_;
  787. public:
  788. explicit TRawProtoListImpl(const TOutputSpec& outputSpec, TWorkerHolder<IPullListWorker> worker)
  789. : WorkerHolder_(std::move(worker))
  790. , Converter_(outputSpec, WorkerHolder_.Get())
  791. {
  792. }
  793. public:
  794. OutputItemType<TOutputSpec> Fetch() override {
  795. TBindTerminator bind(WorkerHolder_->GetGraph().GetTerminator());
  796. with_lock(WorkerHolder_->GetScopedAlloc()) {
  797. TUnboxedValue value;
  798. if (!WorkerHolder_->GetOutputIterator().Next(value)) {
  799. return TOutputSpecTraits<TOutputSpec>::StreamSentinel;
  800. }
  801. return Converter_.DoConvert(value);
  802. }
  803. }
  804. };
  805. /**
  806. * Push relay used to convert generated unboxed value to a message and push it to the user's consumer.
  807. */
  808. template <typename TOutputSpec>
  809. class TPushRelayImpl: public IConsumer<const TUnboxedValue*> {
  810. private:
  811. THolder<IConsumer<OutputItemType<TOutputSpec>>> Underlying_;
  812. TOutputConverter<TOutputSpec> Converter_;
  813. IWorker* Worker_;
  814. public:
  815. TPushRelayImpl(
  816. const TOutputSpec& outputSpec,
  817. IPushStreamWorker* worker,
  818. THolder<IConsumer<OutputItemType<TOutputSpec>>> underlying
  819. )
  820. : Underlying_(std::move(underlying))
  821. , Converter_(outputSpec, worker)
  822. , Worker_(worker)
  823. {
  824. }
  825. // If you've read a comment in the TProtoListValue's destructor, you may be wondering why don't we do the
  826. // same trick here. Well, that's because in push mode, consumer is destroyed before acquiring scoped alloc and
  827. // destroying computation graph.
  828. public:
  829. void OnObject(const TUnboxedValue* value) override {
  830. OutputItemType<TOutputSpec> message = Converter_.DoConvert(*value);
  831. auto unguard = Unguard(Worker_->GetScopedAlloc());
  832. Underlying_->OnObject(message);
  833. }
  834. void OnFinish() override {
  835. auto unguard = Unguard(Worker_->GetScopedAlloc());
  836. Underlying_->OnFinish();
  837. }
  838. };
  839. }
  840. using ConsumerType = TInputSpecTraits<TProtobufRawInputSpec>::TConsumerType;
  841. void TInputSpecTraits<TProtobufRawInputSpec>::PreparePullStreamWorker(
  842. const TProtobufRawInputSpec& inputSpec,
  843. IPullStreamWorker* worker,
  844. THolder<IStream<Message*>> stream
  845. ) {
  846. with_lock(worker->GetScopedAlloc()) {
  847. worker->SetInput(
  848. worker->GetGraph().GetHolderFactory().Create<TProtoListValue>(inputSpec, std::move(stream), worker), 0);
  849. }
  850. }
  851. void TInputSpecTraits<TProtobufRawInputSpec>::PreparePullListWorker(
  852. const TProtobufRawInputSpec& inputSpec,
  853. IPullListWorker* worker,
  854. THolder<IStream<Message*>> stream
  855. ) {
  856. with_lock(worker->GetScopedAlloc()) {
  857. worker->SetInput(
  858. worker->GetGraph().GetHolderFactory().Create<TProtoListValue>(inputSpec, std::move(stream), worker), 0);
  859. }
  860. }
  861. ConsumerType TInputSpecTraits<TProtobufRawInputSpec>::MakeConsumer(
  862. const TProtobufRawInputSpec& inputSpec,
  863. TWorkerHolder<IPushStreamWorker> worker
  864. ) {
  865. return MakeHolder<TProtoConsumerImpl>(inputSpec, std::move(worker));
  866. }
  867. template <typename TOutputSpec>
  868. using PullStreamReturnType = typename TOutputSpecTraits<TOutputSpec>::TPullStreamReturnType;
  869. template <typename TOutputSpec>
  870. using PullListReturnType = typename TOutputSpecTraits<TOutputSpec>::TPullListReturnType;
  871. PullStreamReturnType<TProtobufRawOutputSpec> TOutputSpecTraits<TProtobufRawOutputSpec>::ConvertPullStreamWorkerToOutputType(
  872. const TProtobufRawOutputSpec& outputSpec,
  873. TWorkerHolder<IPullStreamWorker> worker
  874. ) {
  875. return MakeHolder<TRawProtoStreamImpl<TProtobufRawOutputSpec>>(outputSpec, std::move(worker));
  876. }
  877. PullListReturnType<TProtobufRawOutputSpec> TOutputSpecTraits<TProtobufRawOutputSpec>::ConvertPullListWorkerToOutputType(
  878. const TProtobufRawOutputSpec& outputSpec,
  879. TWorkerHolder<IPullListWorker> worker
  880. ) {
  881. return MakeHolder<TRawProtoListImpl<TProtobufRawOutputSpec>>(outputSpec, std::move(worker));
  882. }
  883. void TOutputSpecTraits<TProtobufRawOutputSpec>::SetConsumerToWorker(
  884. const TProtobufRawOutputSpec& outputSpec,
  885. IPushStreamWorker* worker,
  886. THolder<IConsumer<TOutputItemType>> consumer
  887. ) {
  888. worker->SetConsumer(MakeHolder<TPushRelayImpl<TProtobufRawOutputSpec>>(outputSpec, worker, std::move(consumer)));
  889. }
  890. PullStreamReturnType<TProtobufRawMultiOutputSpec> TOutputSpecTraits<TProtobufRawMultiOutputSpec>::ConvertPullStreamWorkerToOutputType(
  891. const TProtobufRawMultiOutputSpec& outputSpec,
  892. TWorkerHolder<IPullStreamWorker> worker
  893. ) {
  894. return MakeHolder<TRawProtoStreamImpl<TProtobufRawMultiOutputSpec>>(outputSpec, std::move(worker));
  895. }
  896. PullListReturnType<TProtobufRawMultiOutputSpec> TOutputSpecTraits<TProtobufRawMultiOutputSpec>::ConvertPullListWorkerToOutputType(
  897. const TProtobufRawMultiOutputSpec& outputSpec,
  898. TWorkerHolder<IPullListWorker> worker
  899. ) {
  900. return MakeHolder<TRawProtoListImpl<TProtobufRawMultiOutputSpec>>(outputSpec, std::move(worker));
  901. }
  902. void TOutputSpecTraits<TProtobufRawMultiOutputSpec>::SetConsumerToWorker(
  903. const TProtobufRawMultiOutputSpec& outputSpec,
  904. IPushStreamWorker* worker,
  905. THolder<IConsumer<TOutputItemType>> consumer
  906. ) {
  907. worker->SetConsumer(MakeHolder<TPushRelayImpl<TProtobufRawMultiOutputSpec>>(outputSpec, worker, std::move(consumer)));
  908. }