ZipInsideReduce.yql 1009 B

1234567891011121314151617181920212223242526
  1. (
  2. #comment
  3. (let mr_source (DataSource 'yt 'plato))
  4. (let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
  5. (let world (Left! x))
  6. (let table1 (Right! x))
  7. (let keySelector (lambda '(x) (Member x 'key)))
  8. (let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  9. (let key (Nth pair '0))
  10. (let list (Collect (Nth pair '1)))
  11. (let list (Zip list (Skip list (Uint64 '1))))
  12. (let r (FlatMap list (lambda '(x) (block '(
  13. (let s (Struct))
  14. (let s (AddMember s 'key key))
  15. (let s (AddMember s 'subkey (String '.)))
  16. (let s (AddMember s 'value (Concat (Member (Nth x '0) 'value) (ToString (Member (Nth x '1) 'value)))))
  17. (return (AsList s))
  18. )))))
  19. (return r)
  20. ))))))
  21. (let reducedTable (PartitionByKey table1 keySelector (Void) (Void) listHandler))
  22. (let mr_sink (DataSink 'yt (quote plato)))
  23. (let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
  24. (let world (Commit! world mr_sink))
  25. (return world)
  26. )