command_cluster_ps.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/cluster"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  10. "io"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. )
  13. func init() {
  14. Commands = append(Commands, &commandClusterPs{})
  15. }
  16. type commandClusterPs struct {
  17. }
  18. func (c *commandClusterPs) Name() string {
  19. return "cluster.ps"
  20. }
  21. func (c *commandClusterPs) Help() string {
  22. return `check current cluster process status
  23. cluster.ps
  24. `
  25. }
  26. func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  27. clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  28. if err = clusterPsCommand.Parse(args); err != nil {
  29. return nil
  30. }
  31. var filerNodes []*master_pb.ListClusterNodesResponse_ClusterNode
  32. var mqBrokerNodes []*master_pb.ListClusterNodesResponse_ClusterNode
  33. // get the list of filers
  34. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  35. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  36. ClientType: cluster.FilerType,
  37. FilerGroup: *commandEnv.option.FilerGroup,
  38. })
  39. if err != nil {
  40. return err
  41. }
  42. filerNodes = resp.ClusterNodes
  43. return err
  44. })
  45. if err != nil {
  46. return
  47. }
  48. // get the list of message queue brokers
  49. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  50. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  51. ClientType: cluster.BrokerType,
  52. FilerGroup: *commandEnv.option.FilerGroup,
  53. })
  54. if err != nil {
  55. return err
  56. }
  57. mqBrokerNodes = resp.ClusterNodes
  58. return err
  59. })
  60. if err != nil {
  61. return
  62. }
  63. if len(mqBrokerNodes) > 0 {
  64. fmt.Fprintf(writer, "* message queue brokers %d\n", len(mqBrokerNodes))
  65. for _, node := range mqBrokerNodes {
  66. fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
  67. if node.DataCenter != "" {
  68. fmt.Fprintf(writer, " DataCenter: %v\n", node.DataCenter)
  69. }
  70. if node.Rack != "" {
  71. fmt.Fprintf(writer, " Rack: %v\n", node.Rack)
  72. }
  73. if node.IsLeader {
  74. fmt.Fprintf(writer, " IsLeader: %v\n", true)
  75. }
  76. }
  77. }
  78. fmt.Fprintf(writer, "* filers %d\n", len(filerNodes))
  79. for _, node := range filerNodes {
  80. fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
  81. if node.DataCenter != "" {
  82. fmt.Fprintf(writer, " DataCenter: %v\n", node.DataCenter)
  83. }
  84. if node.Rack != "" {
  85. fmt.Fprintf(writer, " Rack: %v\n", node.Rack)
  86. }
  87. pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  88. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  89. if err == nil {
  90. if resp.FilerGroup != "" {
  91. fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup)
  92. }
  93. fmt.Fprintf(writer, " signature: %d\n", resp.Signature)
  94. } else {
  95. fmt.Fprintf(writer, " failed to connect: %v\n", err)
  96. }
  97. return err
  98. })
  99. }
  100. // collect volume servers
  101. var volumeServers []pb.ServerAddress
  102. t, _, err := collectTopologyInfo(commandEnv, 0)
  103. if err != nil {
  104. return err
  105. }
  106. for _, dc := range t.DataCenterInfos {
  107. for _, r := range dc.RackInfos {
  108. for _, dn := range r.DataNodeInfos {
  109. volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
  110. }
  111. }
  112. }
  113. fmt.Fprintf(writer, "* volume servers %d\n", len(volumeServers))
  114. for _, dc := range t.DataCenterInfos {
  115. fmt.Fprintf(writer, " * data center: %s\n", dc.Id)
  116. for _, r := range dc.RackInfos {
  117. fmt.Fprintf(writer, " * rack: %s\n", r.Id)
  118. for _, dn := range r.DataNodeInfos {
  119. pb.WithVolumeServerClient(false, pb.NewServerAddressFromDataNode(dn), commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  120. resp, err := client.VolumeServerStatus(context.Background(), &volume_server_pb.VolumeServerStatusRequest{})
  121. if err == nil {
  122. fmt.Fprintf(writer, " * %s (%v)\n", dn.Id, resp.Version)
  123. }
  124. return err
  125. })
  126. }
  127. }
  128. }
  129. return nil
  130. }