wfs_filer_client.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package mount
  2. import (
  3. "sync/atomic"
  4. "google.golang.org/grpc"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. var _ = filer_pb.FilerClient(&WFS{})
  11. func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
  12. return util.Retry("filer grpc", func() error {
  13. i := atomic.LoadInt32(&wfs.option.filerIndex)
  14. n := len(wfs.option.FilerAddresses)
  15. for x := 0; x < n; x++ {
  16. filerGrpcAddress := wfs.option.FilerAddresses[i].ToGrpcAddress()
  17. err = pb.WithGrpcClient(streamingMode, wfs.signature, func(grpcConnection *grpc.ClientConn) error {
  18. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  19. return fn(client)
  20. }, filerGrpcAddress, false, wfs.option.GrpcDialOption)
  21. if err != nil {
  22. glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err)
  23. } else {
  24. atomic.StoreInt32(&wfs.option.filerIndex, i)
  25. return nil
  26. }
  27. i++
  28. if i >= int32(n) {
  29. i = 0
  30. }
  31. }
  32. return err
  33. })
  34. }
  35. func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string {
  36. if wfs.option.VolumeServerAccess == "publicUrl" {
  37. return location.PublicUrl
  38. }
  39. return location.Url
  40. }
  41. func (wfs *WFS) GetDataCenter() string {
  42. return wfs.option.DataCenter
  43. }