MultiHoppingCore.yqls 1.3 KB

1234567891011121314151617181920212223
  1. (
  2. (let list (AsList
  3. (AsStruct '('"time" (String '"2024-01-01T00:00:01Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
  4. (AsStruct '('"time" (String '"2024-01-01T00:00:02Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
  5. (AsStruct '('"time" (String '"2024-01-01T00:00:03Z")) '('"user" (Int32 '"1")) '('"data" (Null)))
  6. ))
  7. (let input (FlatMap list (lambda '(row) (Just (AsStruct '('"data" (Member row '"data")) '('group0 (AsList (Member row '"user"))) '('"time" (Member row '"time")) '('"user" (Member row '"user")))))))
  8. (let keySelector (lambda '(row) '((StablePickle (Member row '"data")) (StablePickle (Member row 'group0)))))
  9. (let sortKeySelector (lambda '(row) (SafeCast (Member row '"time") (OptionalType (DataType 'Timestamp)))))
  10. (let interval (Interval '1000000))
  11. (let map (lambda '(item) (AsStruct)))
  12. (let reduce (lambda '(lhs rhs) (AsStruct)))
  13. (let hopping (MultiHoppingCore (Iterator input) keySelector sortKeySelector interval interval interval
  14. 'true map reduce map map reduce
  15. (lambda '(key state time) (AsStruct
  16. '('_yql_time time)
  17. '('"data" (Nth key '"0"))
  18. '('group0 (Nth key '"1")))) '0))
  19. (let data (FormatType (TypeOf hopping))) # no typical comp node
  20. (let res (DataSink 'result))
  21. (let world (Write! world res (Key) data '('('type))))
  22. (return (Commit! world res))
  23. )