volume_grpc_query.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package weed_server
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/operation"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/query/json"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. "github.com/tidwall/gjson"
  9. )
  10. func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error {
  11. for _, fid := range req.FromFileIds {
  12. vid, id_cookie, err := operation.ParseFileId(fid)
  13. if err != nil {
  14. glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err)
  15. return err
  16. }
  17. n := new(needle.Needle)
  18. volumeId, _ := needle.NewVolumeId(vid)
  19. n.ParsePath(id_cookie)
  20. cookie := n.Cookie
  21. if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
  22. glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
  23. return err
  24. }
  25. if n.Cookie != cookie {
  26. glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err)
  27. return err
  28. }
  29. if req.InputSerialization.CsvInput != nil {
  30. }
  31. if req.InputSerialization.JsonInput != nil {
  32. stripe := &volume_server_pb.QueriedStripe{
  33. Records: nil,
  34. }
  35. filter := json.Query{
  36. Field: req.Filter.Field,
  37. Op: req.Filter.Operand,
  38. Value: req.Filter.Value,
  39. }
  40. gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool {
  41. passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter)
  42. if !passedFilter {
  43. return true
  44. }
  45. stripe.Records = json.ToJson(stripe.Records, req.Selections, values)
  46. return true
  47. })
  48. err = stream.Send(stripe)
  49. if err != nil {
  50. return err
  51. }
  52. }
  53. }
  54. return nil
  55. }