udaf.sql 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. /* postgres can not */
  2. /* syntax version 1 */
  3. $script = @@
  4. import heapq
  5. import json
  6. N_SMALLEST = 3
  7. def create(item):
  8. return [item]
  9. def add(state, item):
  10. heapq.heappush(state, item)
  11. return heapq.nsmallest(N_SMALLEST, state)
  12. def merge(state_a, state_b):
  13. merged = heapq.merge(state_a, state_b)
  14. return heapq.nsmallest(N_SMALLEST, merged)
  15. def get_result(state):
  16. result = heapq.nsmallest(N_SMALLEST, state)
  17. return '%d smallest items: %s' % (
  18. N_SMALLEST,
  19. ', '.join(map(str, result))
  20. )
  21. def serialize(state):
  22. return json.dumps(state)
  23. def deserialize(serialized):
  24. return json.loads(serialized)
  25. @@;
  26. $create = Python3::create(Callable<(Double)->Resource<Python3>>, $script);
  27. $add = Python3::add(Callable<(Resource<Python3>,Double)->Resource<Python3>>, $script);
  28. $merge = Python3::merge(Callable<(Resource<Python3>,Resource<Python3>)->Resource<Python3>>, $script);
  29. $get_result = Python3::get_result(Callable<(Resource<Python3>)->String>, $script);
  30. $serialize = Python3::serialize(Callable<(Resource<Python3>)->String>, $script);
  31. $deserialize = Python3::deserialize(Callable<(String)->Resource<Python3>>, $script);
  32. SELECT UDAF(
  33. CAST(key AS Double),
  34. $create,
  35. $add,
  36. $merge,
  37. $get_result,
  38. $serialize,
  39. $deserialize
  40. ) FROM plato.Input;