filer_client_accessor.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package filer_client
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "google.golang.org/grpc"
  9. )
  10. type FilerClientAccessor struct {
  11. GetFiler func() pb.ServerAddress
  12. GetGrpcDialOption func() grpc.DialOption
  13. }
  14. func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  15. return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn)
  16. }
  17. func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error {
  18. glog.V(0).Infof("save conf for topic %v to filer", t)
  19. // save the topic configuration on filer
  20. return fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  21. return t.WriteConfFile(client, conf)
  22. })
  23. }
  24. func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
  25. glog.V(1).Infof("load conf for topic %v from filer", t)
  26. if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  27. conf, err = t.ReadConfFile(client)
  28. return err
  29. }); err != nil {
  30. return nil, err
  31. }
  32. return conf, nil
  33. }