master_grpc_server_raft.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/hashicorp/raft"
  6. "github.com/seaweedfs/seaweedfs/weed/cluster"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  8. )
  9. func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
  10. resp := &master_pb.RaftListClusterServersResponse{}
  11. ms.Topo.RaftServerAccessLock.RLock()
  12. if ms.Topo.HashicorpRaft == nil {
  13. ms.Topo.RaftServerAccessLock.RUnlock()
  14. return resp, nil
  15. }
  16. servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
  17. ms.Topo.RaftServerAccessLock.RUnlock()
  18. for _, server := range servers {
  19. resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
  20. Id: string(server.ID),
  21. Address: string(server.Address),
  22. Suffrage: server.Suffrage.String(),
  23. })
  24. }
  25. return resp, nil
  26. }
  27. func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
  28. resp := &master_pb.RaftAddServerResponse{}
  29. ms.Topo.RaftServerAccessLock.RLock()
  30. defer ms.Topo.RaftServerAccessLock.RUnlock()
  31. if ms.Topo.HashicorpRaft == nil {
  32. return resp, nil
  33. }
  34. if ms.Topo.HashicorpRaft.State() != raft.Leader {
  35. return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
  36. }
  37. var idxFuture raft.IndexFuture
  38. if req.Voter {
  39. idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
  40. } else {
  41. idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
  42. }
  43. if err := idxFuture.Error(); err != nil {
  44. return nil, err
  45. }
  46. return resp, nil
  47. }
  48. func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
  49. resp := &master_pb.RaftRemoveServerResponse{}
  50. ms.Topo.RaftServerAccessLock.RLock()
  51. defer ms.Topo.RaftServerAccessLock.RUnlock()
  52. if ms.Topo.HashicorpRaft == nil {
  53. return resp, nil
  54. }
  55. if ms.Topo.HashicorpRaft.State() != raft.Leader {
  56. return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
  57. }
  58. if !req.Force {
  59. ms.clientChansLock.RLock()
  60. _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
  61. ms.clientChansLock.RUnlock()
  62. if ok {
  63. return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
  64. }
  65. }
  66. idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
  67. if err := idxFuture.Error(); err != nil {
  68. return nil, err
  69. }
  70. return resp, nil
  71. }