raft_server.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "math/rand"
  5. "os"
  6. "path"
  7. "sort"
  8. "strings"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/raft"
  13. "github.com/chrislusf/seaweedfs/weed/glog"
  14. "github.com/chrislusf/seaweedfs/weed/topology"
  15. )
  16. type RaftServerOption struct {
  17. GrpcDialOption grpc.DialOption
  18. Peers []pb.ServerAddress
  19. ServerAddr pb.ServerAddress
  20. DataDir string
  21. Topo *topology.Topology
  22. RaftResumeState bool
  23. HeartbeatInterval time.Duration
  24. ElectionTimeout time.Duration
  25. }
  26. type RaftServer struct {
  27. peers []pb.ServerAddress // initial peers to join with
  28. raftServer raft.Server
  29. dataDir string
  30. serverAddr pb.ServerAddress
  31. topo *topology.Topology
  32. *raft.GrpcServer
  33. }
  34. type StateMachine struct {
  35. raft.StateMachine
  36. topo *topology.Topology
  37. }
  38. func (s StateMachine) Save() ([]byte, error) {
  39. state := topology.MaxVolumeIdCommand{
  40. MaxVolumeId: s.topo.GetMaxVolumeId(),
  41. }
  42. glog.V(1).Infof("Save raft state %+v", state)
  43. return json.Marshal(state)
  44. }
  45. func (s StateMachine) Recovery(data []byte) error {
  46. state := topology.MaxVolumeIdCommand{}
  47. err := json.Unmarshal(data, &state)
  48. if err != nil {
  49. return err
  50. }
  51. glog.V(1).Infof("Recovery raft state %+v", state)
  52. s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
  53. return nil
  54. }
  55. func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
  56. s := &RaftServer{
  57. peers: option.Peers,
  58. serverAddr: option.ServerAddr,
  59. dataDir: option.DataDir,
  60. topo: option.Topo,
  61. }
  62. if glog.V(4) {
  63. raft.SetLogLevel(2)
  64. }
  65. raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
  66. var err error
  67. transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
  68. glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
  69. if !option.RaftResumeState {
  70. // always clear previous metadata
  71. os.RemoveAll(path.Join(s.dataDir, "conf"))
  72. os.RemoveAll(path.Join(s.dataDir, "log"))
  73. os.RemoveAll(path.Join(s.dataDir, "snapshot"))
  74. }
  75. if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
  76. return nil, err
  77. }
  78. stateMachine := StateMachine{topo: option.Topo}
  79. s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
  80. if err != nil {
  81. glog.V(0).Infoln(err)
  82. return nil, err
  83. }
  84. heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
  85. s.raftServer.SetHeartbeatInterval(heartbeatInterval)
  86. s.raftServer.SetElectionTimeout(option.ElectionTimeout)
  87. if err := s.raftServer.LoadSnapshot(); err != nil {
  88. return nil, err
  89. }
  90. if err := s.raftServer.Start(); err != nil {
  91. return nil, err
  92. }
  93. for _, peer := range s.peers {
  94. if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil {
  95. return nil, err
  96. }
  97. }
  98. // Remove deleted peers
  99. for existsPeerName := range s.raftServer.Peers() {
  100. exists := false
  101. var existingPeer pb.ServerAddress
  102. for _, peer := range s.peers {
  103. if peer.ToGrpcAddress() == existsPeerName {
  104. exists, existingPeer = true, peer
  105. break
  106. }
  107. }
  108. if exists {
  109. if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
  110. glog.V(0).Infoln(err)
  111. return nil, err
  112. } else {
  113. glog.V(0).Infof("removing old peer %s", existingPeer)
  114. }
  115. }
  116. }
  117. s.GrpcServer = raft.NewGrpcServer(s.raftServer)
  118. if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) {
  119. // Initialize the server by joining itself.
  120. // s.DoJoinCommand()
  121. }
  122. glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
  123. return s, nil
  124. }
  125. func (s *RaftServer) Peers() (members []string) {
  126. peers := s.raftServer.Peers()
  127. for _, p := range peers {
  128. members = append(members, p.Name)
  129. }
  130. return
  131. }
  132. func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
  133. sort.Slice(peers, func(i, j int) bool {
  134. return strings.Compare(string(peers[i]), string(peers[j])) < 0
  135. })
  136. if len(peers) <= 0 {
  137. return true
  138. }
  139. return self == peers[0]
  140. }
  141. func (s *RaftServer) DoJoinCommand() {
  142. glog.V(0).Infoln("Initializing new cluster")
  143. if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
  144. Name: s.raftServer.Name(),
  145. ConnectionString: s.serverAddr.ToGrpcAddress(),
  146. }); err != nil {
  147. glog.Errorf("fail to send join command: %v", err)
  148. }
  149. }