1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- (
- (let config (DataSource 'config))
- (let timeExtractor (lambda '(item) (Just (Member item 'time))))
- (let init (lambda '(item)
- (AsStruct
- '('sum (Member item 'sum))
- '('max (Member item 'max))
- )))
- (let update (lambda '(item state)
- (AsStruct
- '('sum (AggrAdd (Member item 'sum) (Member state 'sum)))
- '('max (AggrMax (Member item 'max) (Member state 'max)))
- )))
- (let merge (lambda '(state1 state2)
- (AsStruct
- '('sum (AggrAdd (Member state1 'sum) (Member state2 'sum)))
- '('max (AggrMax (Member state1 'max) (Member state2 'max)))
- )))
- (let save (lambda '(state) state))
- (let load (lambda '(state) state))
- (let finish (lambda '(state time) (AddMember state '_yql_time time)))
- (let stream (Iterator (AsList
- (AsStruct '('time (Timestamp '1)) '('sum (Uint32 '2)) '('max (String 'f)))
- (AsStruct '('time (Timestamp '2)) '('sum (Uint32 '3)) '('max (String 'a)))
- (AsStruct '('time (Timestamp '15)) '('sum (Uint32 '4)) '('max (String 'e)))
- (AsStruct '('time (Timestamp '23)) '('sum (Uint32 '6)) '('max (String 'h)))
- (AsStruct '('time (Timestamp '24)) '('sum (Uint32 '5)) '('max (String 'd)))
- (AsStruct '('time (Timestamp '25)) '('sum (Uint32 '7)) '('max (String 's)))
- (AsStruct '('time (Timestamp '40)) '('sum (Uint32 '2)) '('max (String 'j)))
- (AsStruct '('time (Timestamp '47)) '('sum (Uint32 '1)) '('max (String 't)))
- (AsStruct '('time (Timestamp '51)) '('sum (Uint32 '6)) '('max (String 'b)))
- (AsStruct '('time (Timestamp '59)) '('sum (Uint32 '2)) '('max (String 'c)))
- (AsStruct '('time (Timestamp '85)) '('sum (Uint32 '8)) '('max (String 'g)))
- (AsStruct '('time (Timestamp '55)) '('sum (Uint32 '1000)) '('max (String 'z)))
- (AsStruct '('time (Timestamp '200)) '('sum (Uint32 '0)) '('max (String 'a)))
- )))
- # row with time 55 should be excluded from aggregation due to delay
- (let itemType (StructType '('time (DataType 'Timestamp)) '('sum (DataType 'Uint32)) '('max (DataType 'String))))
- (let res (HoppingCore stream timeExtractor (Interval '10) (Interval '30) (Interval '20) init update save load merge finish))
- (let res_sink (DataSink 'result))
- (let world (Write! world res_sink (Key) (Collect res) '('('type))))
- (let world (Commit! world res_sink))
- (return world)
- )
|