master_follower.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/gorilla/mux"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/security"
  13. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. "google.golang.org/grpc/reflection"
  16. )
  17. var (
  18. mf MasterOptions
  19. )
  20. func init() {
  21. cmdMasterFollower.Run = runMasterFollower // break init cycle
  22. mf.port = cmdMasterFollower.Flag.Int("port", 9334, "http listen port")
  23. mf.portGrpc = cmdMasterFollower.Flag.Int("port.grpc", 0, "grpc listen port")
  24. mf.ipBind = cmdMasterFollower.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
  25. mf.peers = cmdMasterFollower.Flag.String("masters", "localhost:9333", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095")
  26. mf.ip = aws.String(util.DetectedHostAddress())
  27. mf.metaFolder = aws.String("")
  28. mf.volumeSizeLimitMB = nil
  29. mf.volumePreallocate = nil
  30. mf.defaultReplication = nil
  31. mf.garbageThreshold = aws.Float64(0.1)
  32. mf.whiteList = nil
  33. mf.disableHttp = aws.Bool(false)
  34. mf.metricsAddress = aws.String("")
  35. mf.metricsIntervalSec = aws.Int(0)
  36. mf.raftResumeState = aws.Bool(false)
  37. }
  38. var cmdMasterFollower = &Command{
  39. UsageLine: "master.follower -port=9333 -masters=<master1Host>:<master1Port>",
  40. Short: "start a master follower",
  41. Long: `start a master follower to provide volume=>location mapping service
  42. The master follower does not participate in master election.
  43. It just follow the existing masters, and listen for any volume location changes.
  44. In most cases, the master follower is not needed. In big data centers with thousands of volume
  45. servers. In theory, the master may have trouble to keep up with the write requests and read requests.
  46. The master follower can relieve the master from read requests, which only needs to
  47. lookup a fileId or volumeId.
  48. The master follower currently can handle fileId lookup requests:
  49. /dir/lookup?volumeId=4
  50. /dir/lookup?fileId=4,49c50924569199
  51. And gRPC API
  52. rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) {}
  53. This master follower is stateless and can run from any place.
  54. `,
  55. }
  56. func runMasterFollower(cmd *Command, args []string) bool {
  57. util.LoadConfiguration("security", false)
  58. util.LoadConfiguration("master", false)
  59. if *mf.portGrpc == 0 {
  60. *mf.portGrpc = 10000 + *mf.port
  61. }
  62. startMasterFollower(mf)
  63. return true
  64. }
  65. func startMasterFollower(masterOptions MasterOptions) {
  66. // collect settings from main masters
  67. masters := pb.ServerAddresses(*mf.peers).ToAddressMap()
  68. var err error
  69. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
  70. for i := 0; i < 10; i++ {
  71. err = pb.WithOneOfGrpcMasterClients(false, masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
  72. resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  73. if err != nil {
  74. return fmt.Errorf("get master grpc address %v configuration: %v", masters, err)
  75. }
  76. masterOptions.defaultReplication = &resp.DefaultReplication
  77. masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB))
  78. masterOptions.volumePreallocate = &resp.VolumePreallocate
  79. return nil
  80. })
  81. if err != nil {
  82. glog.V(0).Infof("failed to talk to filer %v: %v", masters, err)
  83. glog.V(0).Infof("wait for %d seconds ...", i+1)
  84. time.Sleep(time.Duration(i+1) * time.Second)
  85. }
  86. }
  87. if err != nil {
  88. glog.Errorf("failed to talk to filer %v: %v", masters, err)
  89. return
  90. }
  91. option := masterOptions.toMasterOption(nil)
  92. option.IsFollower = true
  93. if *masterOptions.ipBind == "" {
  94. *masterOptions.ipBind = *masterOptions.ip
  95. }
  96. r := mux.NewRouter()
  97. ms := weed_server.NewMasterServer(r, option, masters)
  98. listeningAddress := util.JoinHostPort(*masterOptions.ipBind, *masterOptions.port)
  99. glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
  100. masterListener, masterLocalListener, e := util.NewIpAndLocalListeners(*masterOptions.ipBind, *masterOptions.port, 0)
  101. if e != nil {
  102. glog.Fatalf("Master startup error: %v", e)
  103. }
  104. // starting grpc server
  105. grpcPort := *masterOptions.portGrpc
  106. grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*masterOptions.ipBind, grpcPort, 0)
  107. if err != nil {
  108. glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
  109. }
  110. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.master"))
  111. master_pb.RegisterSeaweedServer(grpcS, ms)
  112. reflection.Register(grpcS)
  113. glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.Version(), *masterOptions.ip, grpcPort)
  114. if grpcLocalL != nil {
  115. go grpcS.Serve(grpcLocalL)
  116. }
  117. go grpcS.Serve(grpcL)
  118. go ms.MasterClient.KeepConnectedToMaster(context.Background())
  119. // start http server
  120. httpS := &http.Server{Handler: r}
  121. if masterLocalListener != nil {
  122. go httpS.Serve(masterLocalListener)
  123. }
  124. go httpS.Serve(masterListener)
  125. select {}
  126. }