program29.yql 1006 B

12345678910111213141516171819202122232425
  1. (
  2. #comment
  3. (let mr_source (DataSource 'yt 'plato))
  4. (let x (Read! world mr_source (Key '('table (String 'Input))) '('key 'subkey 'value) '()))
  5. (let world (Left! x))
  6. (let table1 (Right! x))
  7. (let keySelector (lambda '(x) (Member x 'key)))
  8. (let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
  9. (let key (Nth pair '0))
  10. (let list (ForwardList (Nth pair '1)))
  11. (let s (Struct))
  12. (let s (AddMember s 'key key))
  13. (let s (AddMember s 'subkey (String '.)))
  14. (let ilist (FlatMap list (lambda '(item) (FromString (Member item 'value) 'Int32))))
  15. (let sum (Fold ilist (Int32 '0) (lambda '(item state) (+ item state))))
  16. (let s (AddMember s 'value (ToString sum)))
  17. (let ret (AsList s))
  18. (return ret)
  19. ))))))
  20. (let reducedTable (PartitionByKey table1 keySelector (Void) (Void) listHandler))
  21. (let mr_sink (DataSink 'yt (quote plato)))
  22. (let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
  23. (let world (Commit! world mr_sink))
  24. (return world)
  25. )