session_extended.sql 899 B

1234567891011121314151617181920212223242526272829
  1. /* syntax version 1 */
  2. /* postgres can not */
  3. $init = ($row) -> (AsStruct($row.ts ?? 0 as value, 1 as count));
  4. $calculate = ($_row, $state) -> ($state.value);
  5. -- split partition into two-element grooups, make session key to be cumulative sum of ts from partition start
  6. $update = ($row, $state) -> {
  7. $state = AsStruct($state.count + 1 as count, $state.value as value);
  8. $state = AsStruct($state.count as count, $state.value + ($row.ts ?? 0) as value);
  9. return AsTuple(Unwrap($state.count % 2) == 1, $state);
  10. };
  11. SELECT
  12. user,
  13. ts,
  14. payload,
  15. AGGREGATE_LIST(cast(ts as string) ?? "null") over w as ts_session,
  16. COUNT(1) over w as session_len,
  17. SessionStart() over w as session_start,
  18. SessionState() over w as session_state,
  19. FROM plato.Input
  20. WINDOW w AS (
  21. PARTITION BY user, SessionWindow(ts + 1, $init, $update, $calculate)
  22. ORDER BY ts
  23. )
  24. ORDER BY user, payload;