ReduceThenMap.yql 979 B

1234567891011121314151617181920212223242526
  1. (
  2. #comment
  3. (let mr_source (DataSource 'yt 'plato))
  4. (let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
  5. (let world (Left! x))
  6. (let table1 (Right! x))
  7. (let keySelector (lambda '(x) (Member x 'key)))
  8. (let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  9. (let key (Nth pair '0))
  10. (let list (ForwardList (Nth pair '1)))
  11. (let s (Struct))
  12. (let s (AddMember s 'key key))
  13. (let s (AddMember s 'subkey (String '.)))
  14. (let s (AddMember s 'value (ToString (Length list))))
  15. (let ret (AsList s))
  16. (return ret)
  17. ))))))
  18. (let reducedTable (PartitionByKey table1 keySelector (Void) (Void) listHandler))
  19. (let filterAfterReduce (Filter reducedTable (lambda '(x) (block '(
  20. (return (> (Member x 'value) (String '1)))
  21. )))))
  22. (let mr_sink (DataSink 'yt (quote plato)))
  23. (let world (Write! world mr_sink (Key '('table (String 'Output))) filterAfterReduce '('('mode 'append))))
  24. (let world (Commit! world mr_sink))
  25. (return world)
  26. )