master_grpc_server_raft.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/hashicorp/raft"
  6. "github.com/seaweedfs/seaweedfs/weed/cluster"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  8. )
  9. func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
  10. resp := &master_pb.RaftListClusterServersResponse{}
  11. ms.Topo.RaftServerAccessLock.RLock()
  12. if ms.Topo.HashicorpRaft == nil {
  13. ms.Topo.RaftServerAccessLock.RUnlock()
  14. return resp, nil
  15. }
  16. servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
  17. _, leaderId := ms.Topo.HashicorpRaft.LeaderWithID()
  18. ms.Topo.RaftServerAccessLock.RUnlock()
  19. for _, server := range servers {
  20. resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
  21. Id: string(server.ID),
  22. Address: string(server.Address),
  23. Suffrage: server.Suffrage.String(),
  24. IsLeader: server.ID == leaderId,
  25. })
  26. }
  27. return resp, nil
  28. }
  29. func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
  30. resp := &master_pb.RaftAddServerResponse{}
  31. ms.Topo.RaftServerAccessLock.RLock()
  32. defer ms.Topo.RaftServerAccessLock.RUnlock()
  33. if ms.Topo.HashicorpRaft == nil {
  34. return resp, nil
  35. }
  36. if ms.Topo.HashicorpRaft.State() != raft.Leader {
  37. return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
  38. }
  39. var idxFuture raft.IndexFuture
  40. if req.Voter {
  41. idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
  42. } else {
  43. idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
  44. }
  45. if err := idxFuture.Error(); err != nil {
  46. return nil, err
  47. }
  48. return resp, nil
  49. }
  50. func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
  51. resp := &master_pb.RaftRemoveServerResponse{}
  52. ms.Topo.RaftServerAccessLock.RLock()
  53. defer ms.Topo.RaftServerAccessLock.RUnlock()
  54. if ms.Topo.HashicorpRaft == nil {
  55. return resp, nil
  56. }
  57. if ms.Topo.HashicorpRaft.State() != raft.Leader {
  58. return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
  59. }
  60. if !req.Force {
  61. ms.clientChansLock.RLock()
  62. _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
  63. ms.clientChansLock.RUnlock()
  64. if ok {
  65. return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
  66. }
  67. }
  68. idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
  69. if err := idxFuture.Error(); err != nil {
  70. return nil, err
  71. }
  72. return resp, nil
  73. }