proto_traversals.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import copy
  2. from google.protobuf.descriptor import FieldDescriptor as fdescriptor
  3. """Recursive tree traversals for protobuf. Each message
  4. is node, each field is leaf. Function walks through
  5. proto and in each node do smth."""
  6. def search(proto, fname=None, ftype=None):
  7. for desc, obj in proto.ListFields():
  8. if desc.name == fname and (ftype is None or ftype == desc.type):
  9. return (obj, desc, proto)
  10. if desc.type == fdescriptor.TYPE_MESSAGE:
  11. objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj]
  12. for one_obj in objs:
  13. return search(one_obj, fname, ftype)
  14. return None
  15. def search_and_process(proto, return_func=lambda params, child_values=None: params,
  16. recalc_params_func=lambda proto, obj, desc, params: params,
  17. params=None):
  18. """Search and process each node. Recalc params on each step. Pass it down
  19. the tree. On each leaf calcs return value from param, and pass it up. Nodes
  20. calc return value with current param and childs return values.
  21. Args:
  22. * proto -- current node. to run through some proto, put its object here
  23. * return_func -- function that return value. takes current (recalced for current
  24. * node) param and list of return values for current node children.
  25. * for leafs second parametr is None
  26. * recalc_params_func -- function to recalc params in node. takes root proto,
  27. * current object (or objects for repeated fields), current
  28. * proto descriptor and param. return new param value
  29. * params -- initial values for params"""
  30. if proto is None:
  31. return None
  32. return_values = []
  33. for desc, obj in proto.ListFields():
  34. params = copy.deepcopy(params)
  35. if desc.type == fdescriptor.TYPE_MESSAGE:
  36. objs = obj if desc.label == fdescriptor.LABEL_REPEATED else [obj]
  37. params = recalc_params_func(proto, obj, desc, params)
  38. for one_obj in objs:
  39. return_values.append(search_and_process(one_obj, return_func,
  40. recalc_params_func, params))
  41. else:
  42. return_values.append(return_func(recalc_params_func(proto, obj, desc, params), None))
  43. return return_func(params, return_values)
  44. def search_and_process_descriptors(proto_desc,
  45. return_func=lambda params, child_values=None: params,
  46. recalc_params_func=lambda desc, params: params,
  47. params=None):
  48. """Same as search and process(except we run recalc_params in root_proto too),
  49. but process each node from PROTOBUF DESCRIPTIO, instead of each node from
  50. protobuf message."""
  51. params = copy.deepcopy(params)
  52. params = recalc_params_func(proto_desc, params)
  53. if proto_desc is None:
  54. return None
  55. elif hasattr(proto_desc, "type") and proto_desc.type != fdescriptor.TYPE_MESSAGE:
  56. return return_func(params, None)
  57. return_values = []
  58. for field_desc in proto_desc.fields:
  59. desc = field_desc if field_desc.message_type is None else field_desc.message_type
  60. return_values.append(search_and_process_descriptors(desc, return_func,
  61. recalc_params_func, params))
  62. return return_func(params, return_values)