ExtractRenameFromFlatMapOverJoin.yqls 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. (
  2. (let mr_source (DataSource 'yt 'plato))
  3. (let list1 (AsList
  4. (AsStruct '('key1 (Int32 '1)) '('value1 (String 'A)))
  5. (AsStruct '('key1 (Int32 '7)) '('value1 (String 'B)))
  6. (AsStruct '('key1 (Int32 '4)) '('value1 (String 'C)))
  7. (AsStruct '('key1 (Int32 '4)) '('value1 (String 'D)))
  8. ))
  9. (let list2 (AsList
  10. (AsStruct '('key2 (Int32 '9)) '('value2 (String 'Z)))
  11. (AsStruct '('key2 (Int32 '4)) '('value2 (String 'Y)))
  12. (AsStruct '('key2 (Int32 '3)) '('value2 (String 'X)))
  13. (AsStruct '('key2 (Int32 '4)) '('value2 (String 'W)))
  14. (AsStruct '('key2 (Int32 '8)) '('value2 (String 'V)))
  15. ))
  16. (let joinInner (EquiJoin '(list1 'a) '(list2 'b) '('Inner 'a 'b '('a 'key1) '('b 'key2) '()) '()))
  17. (let list (Map joinInner (lambda '(x) (block '(
  18. (return (AsStruct
  19. '('"a.key1" (Member x '"a.key1"))
  20. '('"q" (Member x '"a.value1"))
  21. '('"f.value2" (Member x '"b.value2"))
  22. ))
  23. )))))
  24. (let res_sink (DataSink 'result))
  25. (let world (Write! world res_sink (Key) list '('('type))))
  26. (let joinInner2 (EquiJoin '(list1 'a) '(list2 'b) '('Inner 'a 'b '('a 'key1) '('b 'key2) '())
  27. '('('rename '"a.key1" 'z) '('rename '"b.key2" '""))))
  28. (let list2 (Map joinInner2 (lambda '(x) (block '(
  29. (return (AsStruct
  30. '('"yy" (Member x '"z"))
  31. '('"q" (Member x '"b.value2"))
  32. ))
  33. )))))
  34. (let world (Write! world res_sink (Key) list2 '('('type))))
  35. (let world (Commit! world res_sink))
  36. (return world)
  37. )