command_volume_server_leave.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/operation"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  9. "google.golang.org/grpc"
  10. "io"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandVolumeServerLeave{})
  14. }
  15. type commandVolumeServerLeave struct {
  16. }
  17. func (c *commandVolumeServerLeave) Name() string {
  18. return "volumeServer.leave"
  19. }
  20. func (c *commandVolumeServerLeave) Help() string {
  21. return `stop a volume server from sending heartbeats to the master
  22. volumeServer.leave -node <volume server host:port> -force
  23. This command enables gracefully shutting down the volume server.
  24. The volume server will stop sending heartbeats to the master.
  25. After draining the traffic for a few seconds, you can safely shut down the volume server.
  26. This operation is not revocable unless the volume server is restarted.
  27. `
  28. }
  29. func (c *commandVolumeServerLeave) HasTag(CommandTag) bool {
  30. return false
  31. }
  32. func (c *commandVolumeServerLeave) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  33. vsLeaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  34. volumeServer := vsLeaveCommand.String("node", "", "<host>:<port> of the volume server")
  35. if err = vsLeaveCommand.Parse(args); err != nil {
  36. return nil
  37. }
  38. if err = commandEnv.confirmIsLocked(args); err != nil {
  39. return
  40. }
  41. if *volumeServer == "" {
  42. return fmt.Errorf("need to specify volume server by -node=<host>:<port>")
  43. }
  44. return volumeServerLeave(commandEnv.option.GrpcDialOption, pb.ServerAddress(*volumeServer), writer)
  45. }
  46. func volumeServerLeave(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, writer io.Writer) (err error) {
  47. return operation.WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  48. _, leaveErr := volumeServerClient.VolumeServerLeave(context.Background(), &volume_server_pb.VolumeServerLeaveRequest{})
  49. if leaveErr != nil {
  50. fmt.Fprintf(writer, "ask volume server %s to leave: %v\n", volumeServer, leaveErr)
  51. } else {
  52. fmt.Fprintf(writer, "stopped heartbeat in volume server %s. After a few seconds to drain traffic, it will be safe to stop the volume server.\n", volumeServer)
  53. }
  54. return leaveErr
  55. })
  56. }