1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- (
- #comment
- (let mr_source (DataSource 'yt 'plato))
- (let mr_sink (DataSink 'yt 'plato))
- (let x (Read! world mr_source (Key '('table (String 'Input1))) (Void) '()))
- (let world (Left! x))
- (let data (Right! x))
- (let varTuple (VariantType (TupleType
- (StructType
- '('key (DataType 'String))
- '('subkey (DataType 'String))
- '('value (DataType 'String))
- )
- (StructType
- '('key (DataType 'String))
- '('subkey (DataType 'String))
- '('value (DataType 'String))
- )
- )))
- (let keySelector (lambda '(x) (Member x 'key)))
- (let listHandler (lambda '(groups) (block '(
- (return (Map groups (lambda '(group) (block '(
- (let key (Nth group '0))
- (let stream (Nth group '1))
- (let s (Struct))
- (let s (AddMember s 'key key))
- (let s (AddMember s 'subkey (String '.)))
- (let s (AddMember s 'value (ToString (Length (ForwardList stream)))))
- (let intValue (FromString key 'Int32))
- (let res
- (If (Coalesce (Equal (% intValue (Int32 '2)) (Int32 '0)) (Bool 'false))
- (Variant s '0 varTuple)
- (Variant s '1 varTuple)
- )
- )
- (return res)
- )))))
- ))))
- (let data (PartitionByKey data keySelector (Void) (Void) listHandler))
- (let dataTuple (Demux data))
- (let data1 (Nth dataTuple '0))
- (let data2 (Nth dataTuple '1))
- (let world (Write! world mr_sink (Key '('table (String 'Output1))) data1 '('('mode 'renew))))
- (let world (Write! world mr_sink (Key '('table (String 'Output2))) data2 '('('mode 'renew))))
- (let world (Commit! world mr_sink))
- (return world)
- )
|