BlockMapJoinCore.yqls 2.5 KB

1234567891011121314151617181920212223242526272829303132
  1. (
  2. (let config (DataSource 'config))
  3. (let res_sink (DataSink 'result))
  4. (let row1 (AsStruct '('"key" (Int32 '1)) '('"subkey" (Int32 '"1001")) '('"value" (String '"AAA"))))
  5. (let row2 (AsStruct '('"key" (Int32 '2)) '('"subkey" (Int32 '"1002")) '('"value" (String '"AAB"))))
  6. (let row3 (AsStruct '('"key" (Int32 '3)) '('"subkey" (Int32 '"1003")) '('"value" (String '"AAC"))))
  7. (let row4 (AsStruct '('"key" (Int32 '4)) '('"subkey" (Int32 '"1004")) '('"value" (String '"AAD"))))
  8. (let row5 (AsStruct '('"key" (Int32 '5)) '('"subkey" (Int32 '"1005")) '('"value" (String '"AAE"))))
  9. (let table (AsList row1 row2 row3 row4 row5))
  10. (let expandLambda (lambda '(item) (Member item '"key") (Member item '"subkey") (Member item '"value")))
  11. (let narrowLambdaInner (lambda '(item1 item2 item3 item4 item5 item6) (AsStruct '('"akey" item1) '('"asubkey" item2) '('"avalue" item3) '('"bkey" item4) '('"bsubkey" item5) '('"bvalue" item6))))
  12. (let narrowLambdaLeft (lambda '(item1 item2 item3 item4) (AsStruct '('"asubkey" item1) '('"avalue" item2) '('"bsubkey" item3) '('"bvalue" item4))))
  13. (let narrowLambdaLeftSemi (lambda '(item1 item2) (AsStruct '('"asubkey" item1) '('"avalue" item2))))
  14. (let doJoin (lambda '(left right narrowMapLambda joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops) (block '(
  15. (let leftStream (WideToBlocks (FromFlow (ExpandMap left expandLambda))))
  16. (let rightStream (WideToBlocks (FromFlow (ExpandMap right expandLambda))))
  17. (let rightStreamItemType (StreamItemType (TypeOf rightStream)))
  18. (let rightBlockIndex (BlockMapJoinIndex (BlockStorage rightStream) rightStreamItemType rightKeyColumns '()))
  19. (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore leftStream rightBlockIndex rightStreamItemType joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops))) narrowMapLambda)))
  20. ))))
  21. (let innerJoin (Apply doJoin (ToFlow table (DependsOn (String '0))) (ToFlow table (DependsOn (String '1))) narrowLambdaInner 'Inner '('0) '() '('0) '()))
  22. (let leftJoin (Apply doJoin (ToFlow table (DependsOn (String '2))) (ToFlow table (DependsOn (String '3))) narrowLambdaLeft 'Left '('0) '('0) '('0) '('0)))
  23. (let leftSemiJoin (Apply doJoin (ToFlow table (DependsOn (String '4))) (ToFlow table (DependsOn (String '5))) narrowLambdaLeftSemi 'LeftSemi '('0) '('0) '('0) '()))
  24. (let world (Write! world res_sink (Key) (AsStruct '('"inner" innerJoin) '('"left" leftJoin) '('"leftSemi" leftSemiJoin)) '('('type))))
  25. (let world (Commit! world res_sink))
  26. (return world)
  27. )