udaf_distinct.sql 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. /* postgres can not */
  2. /* syntax version 1 */
  3. $script = @@
  4. import heapq
  5. import json
  6. N_SMALLEST = 3
  7. def create(item):
  8. return [item]
  9. def add(state, item):
  10. heapq.heappush(state, item)
  11. return heapq.nsmallest(N_SMALLEST, state)
  12. def merge(state_a, state_b):
  13. merged = heapq.merge(state_a, state_b)
  14. return heapq.nsmallest(N_SMALLEST, merged)
  15. def get_result(state):
  16. result = heapq.nsmallest(N_SMALLEST, state)
  17. return '%d smallest items: %s' % (
  18. N_SMALLEST,
  19. ', '.join(map(str, result))
  20. )
  21. def serialize(state):
  22. return json.dumps(state)
  23. def deserialize(serialized):
  24. return json.loads(serialized)
  25. @@;
  26. $create = Python3::create(Callable<(Int64)->Resource<Python3>>, $script);
  27. $add = Python3::add(Callable<(Resource<Python3>,Int64)->Resource<Python3>>, $script);
  28. $merge = Python3::merge(Callable<(Resource<Python3>,Resource<Python3>)->Resource<Python3>>, $script);
  29. $get_result = Python3::get_result(Callable<(Resource<Python3>)->String>, $script);
  30. $serialize = Python3::serialize(Callable<(Resource<Python3>)->String>, $script);
  31. $deserialize = Python3::deserialize(Callable<(String)->Resource<Python3>>, $script);
  32. SELECT UDAF(
  33. DISTINCT item,
  34. $create,
  35. $add,
  36. $merge,
  37. $get_result,
  38. $serialize,
  39. $deserialize
  40. ) FROM (
  41. SELECT
  42. CAST(LENGTH(value) AS Int64) AS item
  43. FROM plato.Input
  44. );