123456789101112131415161718192021222324252627282930313233343536373839 |
- (
- (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
- (let world (Left! x))
- (let table0 (Right! x))
- (let preMap (lambda '(item) (Just item)))
- (let keyExtractor (lambda '(item) (Member item 'key)))
- (let init (lambda '(key item) (Member item 'value)))
- (let update (lambda '(key item state) (Concat state (Member item 'value))))
- (let finish (lambda '(key state) (block '(
- (let s (Struct))
- (let s (AddMember s 'key key))
- (let s (AddMember s 'subkey (String '.)))
- (let s (AddMember s 'value state))
- (return (Just s))
- ))))
- (let combine (CombineByKey table0 preMap keyExtractor init update finish))
- (let listHandler (lambda '(stream) (FlatMap stream (lambda '(pair) (block '(
- (let key (Nth pair '0))
- (let list (ForwardList (Nth pair '1)))
- (let s (Struct))
- (let s (AddMember s 'key key))
- (let s (AddMember s 'subkey (String '.)))
- (let value (Fold1 list
- (lambda '(item) (Member item 'value))
- (lambda '(item state) (Concat state (Member item 'value)))
- ))
- (let s (AddMember s 'value (Coalesce value (String '""))))
- (let ret (AsList s))
- (return ret)
- ))))))
- (let reducedTable (PartitionByKey combine keyExtractor (Void) (Void) listHandler))
- (let mr_sink (DataSink 'yt (quote plato)))
- (let world (Write! world mr_sink (Key '('table (String 'Output))) reducedTable '('('mode 'append))))
- (let world (Commit! world mr_sink))
- (return world)
- )
|