master_grpc_server_cluster.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package weed_server
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/cluster"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "math/rand"
  8. )
  9. func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) {
  10. resp := &master_pb.ListClusterNodesResponse{}
  11. filerGroup := cluster.FilerGroupName(req.FilerGroup)
  12. if req.IsLeaderOnly {
  13. leaders := ms.Cluster.ListClusterNodeLeaders(filerGroup, req.ClientType)
  14. for _, node := range leaders {
  15. resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
  16. Address: string(node),
  17. IsLeader: true,
  18. })
  19. }
  20. } else {
  21. clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType)
  22. clusterNodes = limitTo(clusterNodes, req.Limit)
  23. for _, node := range clusterNodes {
  24. resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
  25. Address: string(node.Address),
  26. Version: node.Version,
  27. IsLeader: ms.Cluster.IsOneLeader(filerGroup, req.ClientType, node.Address),
  28. CreatedAtNs: node.CreatedTs.UnixNano(),
  29. DataCenter: string(node.DataCenter),
  30. Rack: string(node.Rack),
  31. })
  32. }
  33. }
  34. return resp, nil
  35. }
  36. func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroupName) pb.ServerAddress {
  37. clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType)
  38. var filers []pb.ServerAddress
  39. for _, node := range clusterNodes {
  40. if ms.Cluster.IsOneLeader(filerGroup, cluster.FilerType, node.Address) {
  41. filers = append(filers, node.Address)
  42. }
  43. }
  44. if len(filers) > 0 {
  45. return filers[rand.Intn(len(filers))]
  46. }
  47. return "localhost:8888"
  48. }
  49. func limitTo(nodes []*cluster.ClusterNode, limit int32) (selected []*cluster.ClusterNode) {
  50. if limit <= 0 || len(nodes) < int(limit) {
  51. return nodes
  52. }
  53. selectedSet := make(map[pb.ServerAddress]*cluster.ClusterNode)
  54. for i := 0; i < int(limit)*3; i++ {
  55. x := rand.Intn(len(nodes))
  56. if _, found := selectedSet[nodes[x].Address]; found {
  57. continue
  58. }
  59. selectedSet[nodes[x].Address] = nodes[x]
  60. }
  61. for _, node := range selectedSet {
  62. selected = append(selected, node)
  63. }
  64. return
  65. }