command_cluster_check.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  7. "io"
  8. "github.com/seaweedfs/seaweedfs/weed/cluster"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandClusterCheck{})
  16. }
  17. type commandClusterCheck struct {
  18. }
  19. func (c *commandClusterCheck) Name() string {
  20. return "cluster.check"
  21. }
  22. func (c *commandClusterCheck) Help() string {
  23. return `check current cluster network connectivity
  24. cluster.check
  25. `
  26. }
  27. func (c *commandClusterCheck) HasTag(CommandTag) bool {
  28. return false
  29. }
  30. func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  31. clusterPsCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  32. if err = clusterPsCommand.Parse(args); err != nil {
  33. return nil
  34. }
  35. // collect topology information
  36. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
  37. if err != nil {
  38. return err
  39. }
  40. fmt.Fprintf(writer, "Topology volumeSizeLimit:%d MB%s\n", volumeSizeLimitMb, diskInfosToString(topologyInfo.DiskInfos))
  41. if len(topologyInfo.DiskInfos) == 0 {
  42. return fmt.Errorf("no disk type defined")
  43. }
  44. for diskType, diskInfo := range topologyInfo.DiskInfos {
  45. if diskInfo.MaxVolumeCount == 0 {
  46. return fmt.Errorf("no volume available for \"%s\" disk type", diskType)
  47. }
  48. }
  49. // collect filers
  50. var filers []pb.ServerAddress
  51. err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
  52. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  53. ClientType: cluster.FilerType,
  54. FilerGroup: *commandEnv.option.FilerGroup,
  55. })
  56. for _, node := range resp.ClusterNodes {
  57. filers = append(filers, pb.ServerAddress(node.Address))
  58. }
  59. return err
  60. })
  61. if err != nil {
  62. return
  63. }
  64. fmt.Fprintf(writer, "the cluster has %d filers: %+v\n", len(filers), filers)
  65. if len(filers) > 0 {
  66. genericDiskInfo, genericDiskInfoOk := topologyInfo.DiskInfos[""]
  67. hddDiskInfo, hddDiskInfoOk := topologyInfo.DiskInfos[types.HddType]
  68. if !genericDiskInfoOk && !hddDiskInfoOk {
  69. return fmt.Errorf("filer metadata logs need generic or hdd disk type to be defined")
  70. }
  71. if (genericDiskInfoOk && genericDiskInfo.MaxVolumeCount == 0) || (hddDiskInfoOk && hddDiskInfo.MaxVolumeCount == 0) {
  72. return fmt.Errorf("filer metadata logs need generic or hdd volumes to be available")
  73. }
  74. }
  75. // collect volume servers
  76. var volumeServers []pb.ServerAddress
  77. t, _, err := collectTopologyInfo(commandEnv, 0)
  78. if err != nil {
  79. return err
  80. }
  81. for _, dc := range t.DataCenterInfos {
  82. for _, r := range dc.RackInfos {
  83. for _, dn := range r.DataNodeInfos {
  84. volumeServers = append(volumeServers, pb.NewServerAddressFromDataNode(dn))
  85. }
  86. }
  87. }
  88. fmt.Fprintf(writer, "the cluster has %d volume servers: %+v\n", len(volumeServers), volumeServers)
  89. // collect all masters
  90. var masters []pb.ServerAddress
  91. masters = append(masters, commandEnv.MasterClient.GetMasters(context.Background())...)
  92. // check from master to volume servers
  93. for _, master := range masters {
  94. for _, volumeServer := range volumeServers {
  95. fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer))
  96. err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error {
  97. pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
  98. Target: string(volumeServer),
  99. TargetType: cluster.VolumeServerType,
  100. })
  101. if err == nil {
  102. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  103. }
  104. return err
  105. })
  106. if err != nil {
  107. fmt.Fprintf(writer, "%v\n", err)
  108. }
  109. }
  110. }
  111. // check between masters
  112. for _, sourceMaster := range masters {
  113. for _, targetMaster := range masters {
  114. if sourceMaster == targetMaster {
  115. continue
  116. }
  117. fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster))
  118. err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error {
  119. pong, err := client.Ping(context.Background(), &master_pb.PingRequest{
  120. Target: string(targetMaster),
  121. TargetType: cluster.MasterType,
  122. })
  123. if err == nil {
  124. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  125. }
  126. return err
  127. })
  128. if err != nil {
  129. fmt.Fprintf(writer, "%v\n", err)
  130. }
  131. }
  132. }
  133. // check from volume servers to masters
  134. for _, volumeServer := range volumeServers {
  135. for _, master := range masters {
  136. fmt.Fprintf(writer, "checking volume server %s to master %s ... ", string(volumeServer), string(master))
  137. err := pb.WithVolumeServerClient(false, volumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  138. pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
  139. Target: string(master),
  140. TargetType: cluster.MasterType,
  141. })
  142. if err == nil {
  143. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  144. }
  145. return err
  146. })
  147. if err != nil {
  148. fmt.Fprintf(writer, "%v\n", err)
  149. }
  150. }
  151. }
  152. // check from filers to masters
  153. for _, filer := range filers {
  154. for _, master := range masters {
  155. fmt.Fprintf(writer, "checking filer %s to master %s ... ", string(filer), string(master))
  156. err := pb.WithFilerClient(false, 0, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  157. pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  158. Target: string(master),
  159. TargetType: cluster.MasterType,
  160. })
  161. if err == nil {
  162. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  163. }
  164. return err
  165. })
  166. if err != nil {
  167. fmt.Fprintf(writer, "%v\n", err)
  168. }
  169. }
  170. }
  171. // check from filers to volume servers
  172. for _, filer := range filers {
  173. for _, volumeServer := range volumeServers {
  174. fmt.Fprintf(writer, "checking filer %s to volume server %s ... ", string(filer), string(volumeServer))
  175. err := pb.WithFilerClient(false, 0, filer, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  176. pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  177. Target: string(volumeServer),
  178. TargetType: cluster.VolumeServerType,
  179. })
  180. if err == nil {
  181. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  182. }
  183. return err
  184. })
  185. if err != nil {
  186. fmt.Fprintf(writer, "%v\n", err)
  187. }
  188. }
  189. }
  190. // check between volume servers
  191. for _, sourceVolumeServer := range volumeServers {
  192. for _, targetVolumeServer := range volumeServers {
  193. if sourceVolumeServer == targetVolumeServer {
  194. continue
  195. }
  196. fmt.Fprintf(writer, "checking volume server %s to %s ... ", string(sourceVolumeServer), string(targetVolumeServer))
  197. err := pb.WithVolumeServerClient(false, sourceVolumeServer, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  198. pong, err := client.Ping(context.Background(), &volume_server_pb.PingRequest{
  199. Target: string(targetVolumeServer),
  200. TargetType: cluster.VolumeServerType,
  201. })
  202. if err == nil {
  203. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  204. }
  205. return err
  206. })
  207. if err != nil {
  208. fmt.Fprintf(writer, "%v\n", err)
  209. }
  210. }
  211. }
  212. // check between filers, and need to connect to itself
  213. for _, sourceFiler := range filers {
  214. for _, targetFiler := range filers {
  215. fmt.Fprintf(writer, "checking filer %s to %s ... ", string(sourceFiler), string(targetFiler))
  216. err := pb.WithFilerClient(false, 0, sourceFiler, commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  217. pong, err := client.Ping(context.Background(), &filer_pb.PingRequest{
  218. Target: string(targetFiler),
  219. TargetType: cluster.FilerType,
  220. })
  221. if err == nil {
  222. printTiming(writer, pong.StartTimeNs, pong.RemoteTimeNs, pong.StopTimeNs)
  223. }
  224. return err
  225. })
  226. if err != nil {
  227. fmt.Fprintf(writer, "%v\n", err)
  228. }
  229. }
  230. }
  231. return nil
  232. }
  233. func printTiming(writer io.Writer, startNs, remoteNs, stopNs int64) {
  234. roundTripTimeMs := float32(stopNs-startNs) / 1000000
  235. deltaTimeMs := float32(remoteNs-(startNs+stopNs)/2) / 1000000
  236. fmt.Fprintf(writer, "ok round trip %.3fms clock delta %.3fms\n", roundTripTimeMs, deltaTimeMs)
  237. }