MapOverDemux.yql 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. (
  2. (let mr_source (DataSource 'yt 'plato))
  3. (let mr_sink (DataSink 'yt 'plato))
  4. (let x (Read! world mr_source (Key '('table (String 'Input1))) (Void) '()))
  5. (let world (Left! x))
  6. (let data (Right! x))
  7. (let varTuple (VariantType (TupleType
  8. (StructType
  9. '('key (DataType 'String))
  10. '('subkey (DataType 'String))
  11. '('value (DataType 'String))
  12. )
  13. (StructType
  14. '('key (DataType 'String))
  15. '('subkey (DataType 'String))
  16. '('value (DataType 'String))
  17. )
  18. )))
  19. (let data (Map data (lambda '(item) (block '(
  20. (let intValue (FromString (Member item 'key) 'Int32))
  21. (let res
  22. (If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
  23. (Variant item '0 varTuple)
  24. (Variant item '1 varTuple)
  25. )
  26. )
  27. (return res)
  28. )))))
  29. (let dataTuple (Demux data))
  30. (let data1 (Nth dataTuple '0))
  31. (let data2 (Nth dataTuple '1))
  32. (let data1 (Filter data1 (lambda '(item) (Less (Member item 'key) (String '200)))))
  33. (let world (Write! world mr_sink (Key '('table (String 'Output1))) data1 '('('mode 'renew))))
  34. (let world (Write! world mr_sink (Key '('table (String 'Output2))) data2 '('('mode 'renew))))
  35. (let world (Commit! world mr_sink))
  36. (return world)
  37. )