12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- (
- (import aggregate_module '"/lib/yql/aggregate.yql")
- (let world (block '(
- (let x (Read! world (DataSource '"yt" '"plato") (Key '('table (String '"Input"))) '('"key" '"subkey" '"value") '()))
- (let world (Left! x))
- (let table0 (Right! x))
- (let output (block '(
- (let select (block '(
- (let core (block '(
- (let select (block '(
- (let core table0)
- (let core (FlatMap core (lambda '(row) (block '(
- (let res (Struct))
- (let res (AddMember res '"key" (Member row '"key")))
- (let res (AddMember res '"subkey" (Member row '"subkey")))
- (let res (AddMember res '"value" (Member row '"value")))
- (let res (AddMember res '"x" ("Size" (Member row '"key"))))
- (let res (AsList res))
- (return res)
- )))))
- (return core)
- )))
- (return select)
- )))
- (let core (block '(
- (let percentile_x_create (lambda '(row) (MatchType (Member row '"x") 'Optional (lambda '(optValue) (FlatMap optValue (lambda '(value) (Just (Apply (Udf 'Stat.TDigest_Create) value))))) (lambda '(value) (Apply (Udf 'Stat.TDigest_Create) value)))))
- (let percentile_x_update (lambda '(row state) (MatchType (Member row '"x") 'Optional (lambda '(optValue) (IfPresent state (lambda '(state) (IfPresent optValue (lambda '(value) (Just (Apply (Udf 'Stat.TDigest_AddValue) state value))) (Just state))) (FlatMap optValue (lambda '(value) (Just (Apply (Udf 'Stat.TDigest_Create) value)))))) (lambda '(value) (Apply (Udf 'Stat.TDigest_AddValue) state value)))))
- (let percentile_x_save (lambda '(state) (MatchType state 'Optional (lambda '(optState) (Map optState (lambda '(currState) (Apply (Udf 'Stat.TDigest_Serialize) currState)))) (lambda '(currState) (Apply (Udf 'Stat.TDigest_Serialize) currState)))))
- (let percentile_x_load (lambda '(item) (MatchType item 'Optional (lambda '(optData) (FlatMap optData (lambda '(data) (Just (Apply (Udf 'Stat.TDigest_Deserialize) data))))) (lambda '(data) (Apply (Udf 'Stat.TDigest_Deserialize) data)))))
- (let percentile_x_merge (lambda '(a b) (OptionalReduce a b (lambda '(a b) (Apply (Udf 'Stat.TDigest_Merge) a b)))))
- (let percentile_x_finish (lambda '(state) (block '(
- (let res (Struct))
- (let res (AddMember res 'Percentile9 (Apply (lambda '(state) (MatchType state 'Optional (lambda '(optData) (Map optData (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.5))))) (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.5))))) state)))
- (let res (AddMember res 'Percentile10 (Apply (lambda '(state) (MatchType state 'Optional (lambda '(optData) (Map optData (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.9))))) (lambda '(data) (Apply (Udf 'Stat.TDigest_GetPercentile) data (Double '0.9))))) state)))
- (return res)
- ))))
- (return (Aggregate core '('"key") '('('Min1 (Apply (bind aggregate_module '"min_traits_factory") (TypeOf core) (lambda '(row) ("Concat" (Member row '"subkey") (String '"q"))))) '('Min2 (Apply (bind aggregate_module '"min_traits_factory") (TypeOf core) (lambda '(row) ("Concat" (Member row '"subkey") (String '"q"))))) '('Max3 (Apply (bind aggregate_module '"max_traits_factory") (TypeOf core) (lambda '(row) (Member row '"value")))) '('Max4 (Apply (bind aggregate_module '"max_traits_factory") (TypeOf core) (lambda '(row) (Member row '"value")))) '('Min5 (Apply (bind aggregate_module '"min_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"subkey")) (lambda '(row) row)) '"subkey") '('Min6 (Apply (bind aggregate_module '"min_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"subkey")) (lambda '(row) row)) '"subkey") '('Max7 (Apply (bind aggregate_module '"max_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"value")) (lambda '(row) row)) '"value") '('Max8 (Apply (bind aggregate_module '"max_traits_factory") (ListType (StructMemberType (ListItemType (TypeOf core)) '"value")) (lambda '(row) row)) '"value") '('percentile_x (AggregationTraits (ListItemType (TypeOf core)) percentile_x_create percentile_x_update percentile_x_save percentile_x_load percentile_x_merge percentile_x_finish (Null))))))
- )))
- (let core (FlatMap core (lambda '(row) (block '(
- (let res (Struct))
- (let res (AddMember res '"key" (Member row '"key")))
- (let res (AddMember res '"x1" ("Concat" (String '"a") (Member row 'Min1))))
- (let res (AddMember res '"x2" ("Concat" (String '"b") (Member row 'Min2))))
- (let res (AddMember res '"x3" ("Concat" (String '"c") (Member row 'Max3))))
- (let res (AddMember res '"x4" ("Concat" (String '"d") (Member row 'Max4))))
- (let res (AddMember res '"y1" ("Concat" (String '"1") (Member row 'Min5))))
- (let res (AddMember res '"y2" ("Concat" (String '"2") (Member row 'Min6))))
- (let res (AddMember res '"y3" ("Concat" (String '"3") (Member row 'Max7))))
- (let res (AddMember res '"y4" ("Concat" (String '"4") (Member row 'Max8))))
- (let res (AddMember res '"column9" (Member (Member row '"percentile_x") 'Percentile9)))
- (let res (AddMember res '"column10" (Member (Member row '"percentile_x") 'Percentile10)))
- (let res (AsList res))
- (return res)
- )))))
- (return core)
- )))
- (let select (Sort select (Bool 'true) (lambda '(row) (Member row '"key"))))
- (return select)
- )))
- (let world (block '(
- (let result_sink (DataSink 'result))
- (let world (Write! world result_sink (Key) output '('('type) '('autoref) '('columns '('"key" '"x1" '"x2" '"x3" '"x4" '"y1" '"y2" '"y3" '"y4" '"column9" '"column10")))))
- (return (Commit! world result_sink))
- )))
- (return world)
- )))
- (let world (block '(
- (let plato_sink (DataSink '"yt" '"plato"))
- (let world (Commit! world plato_sink))
- (return world)
- )))
- (return world)
- )
|