test_lmap_opts.sql 1019 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. /* syntax version 1 */
  2. -- not supported on windows
  3. /* postgres can not */
  4. $udfScript = @@
  5. import collections;
  6. def processRows(prefix, rowList, separator):
  7. ResultRow = collections.namedtuple("ResultRow", ["Result"]);
  8. result = [];
  9. for row in rowList:
  10. resultValue = prefix + row.Name + separator + row.Value;
  11. resultItem = ResultRow(Result=resultValue);
  12. result.append(resultItem);
  13. return result;
  14. @@;
  15. $udf = Python::processRows(
  16. Callable<(String, List<Struct<Name:String, Value:String>>, String)->List<Struct<Result:String>>>,
  17. $udfScript
  18. );
  19. $data = (
  20. SELECT key AS Name, value AS Value FROM plato.Input0
  21. );
  22. $prefix = ">>";
  23. $p1 = (
  24. PROCESS $data USING $udf($prefix, TableRows(), "=") WHERE Name != "foo"
  25. );
  26. $p2 = (
  27. SELECT Result AS Data FROM $p1
  28. );
  29. $p3 = (
  30. PROCESS $p2 USING Streaming::Process(TableRows(), "grep", AsList("180"))
  31. );
  32. $p4 = (
  33. SELECT Data AS FinalResult FROM $p3
  34. );
  35. SELECT Avg(Length(FinalResult)) AS AvgResultLength FROM $p4;