123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- package weed_server
- import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/query/json"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/tidwall/gjson"
- )
- func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error {
- for _, fid := range req.FromFileIds {
- vid, id_cookie, err := operation.ParseFileId(fid)
- if err != nil {
- glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err)
- return err
- }
- n := new(needle.Needle)
- volumeId, _ := needle.NewVolumeId(vid)
- n.ParsePath(id_cookie)
- cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
- glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
- return err
- }
- if n.Cookie != cookie {
- glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err)
- return err
- }
- if req.InputSerialization.CsvInput != nil {
- }
- if req.InputSerialization.JsonInput != nil {
- stripe := &volume_server_pb.QueriedStripe{
- Records: nil,
- }
- filter := json.Query{
- Field: req.Filter.Field,
- Op: req.Filter.Operand,
- Value: req.Filter.Value,
- }
- gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool {
- passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter)
- if !passedFilter {
- return true
- }
- stripe.Records = json.ToJson(stripe.Records, req.Selections, values)
- return true
- })
- err = stream.Send(stripe)
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
|