command_volume_move.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  11. "github.com/seaweedfs/seaweedfs/weed/operation"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "google.golang.org/grpc"
  15. )
  16. func init() {
  17. Commands = append(Commands, &commandVolumeMove{})
  18. }
  19. type commandVolumeMove struct {
  20. }
  21. func (c *commandVolumeMove) Name() string {
  22. return "volume.move"
  23. }
  24. func (c *commandVolumeMove) Help() string {
  25. return `move a live volume from one volume server to another volume server
  26. volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id>
  27. volume.move -source <source volume server host:port> -target <target volume server host:port> -volumeId <volume id> -disk [hdd|ssd|<tag>]
  28. This command move a live volume from one volume server to another volume server. Here are the steps:
  29. 1. This command asks the target volume server to copy the source volume from source volume server, remember the last entry's timestamp.
  30. 2. This command asks the target volume server to mount the new volume
  31. Now the master will mark this volume id as readonly.
  32. 3. This command asks the target volume server to tail the source volume for updates after the timestamp, for 1 minutes to drain the requests.
  33. 4. This command asks the source volume server to unmount the source volume
  34. Now the master will mark this volume id as writable.
  35. 5. This command asks the source volume server to delete the source volume
  36. The option "-disk [hdd|ssd|<tag>]" can be used to change the volume disk type.
  37. `
  38. }
  39. func (c *commandVolumeMove) HasTag(CommandTag) bool {
  40. return false
  41. }
  42. func (c *commandVolumeMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  43. volMoveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  44. volumeIdInt := volMoveCommand.Int("volumeId", 0, "the volume id")
  45. sourceNodeStr := volMoveCommand.String("source", "", "the source volume server <host>:<port>")
  46. targetNodeStr := volMoveCommand.String("target", "", "the target volume server <host>:<port>")
  47. diskTypeStr := volMoveCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  48. ioBytePerSecond := volMoveCommand.Int64("ioBytePerSecond", 0, "limit the speed of move")
  49. if err = volMoveCommand.Parse(args); err != nil {
  50. return nil
  51. }
  52. if err = commandEnv.confirmIsLocked(args); err != nil {
  53. return
  54. }
  55. sourceVolumeServer, targetVolumeServer := pb.ServerAddress(*sourceNodeStr), pb.ServerAddress(*targetNodeStr)
  56. volumeId := needle.VolumeId(*volumeIdInt)
  57. if sourceVolumeServer == targetVolumeServer {
  58. return fmt.Errorf("source and target volume servers are the same!")
  59. }
  60. return LiveMoveVolume(commandEnv.option.GrpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, 5*time.Second, *diskTypeStr, *ioBytePerSecond, false)
  61. }
  62. // LiveMoveVolume moves one volume from one source volume server to one target volume server, with idleTimeout to drain the incoming requests.
  63. func LiveMoveVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, idleTimeout time.Duration, diskType string, ioBytePerSecond int64, skipTailError bool) (err error) {
  64. log.Printf("copying volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
  65. lastAppendAtNs, err := copyVolume(grpcDialOption, writer, volumeId, sourceVolumeServer, targetVolumeServer, diskType, ioBytePerSecond)
  66. if err != nil {
  67. return fmt.Errorf("copy volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
  68. }
  69. log.Printf("tailing volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
  70. if err = tailVolume(grpcDialOption, volumeId, sourceVolumeServer, targetVolumeServer, lastAppendAtNs, idleTimeout); err != nil {
  71. if skipTailError {
  72. fmt.Fprintf(writer, "tail volume %d from %s to %s: %v\n", volumeId, sourceVolumeServer, targetVolumeServer, err)
  73. } else {
  74. return fmt.Errorf("tail volume %d from %s to %s: %v", volumeId, sourceVolumeServer, targetVolumeServer, err)
  75. }
  76. }
  77. log.Printf("deleting volume %d from %s", volumeId, sourceVolumeServer)
  78. if err = deleteVolume(grpcDialOption, volumeId, sourceVolumeServer, false); err != nil {
  79. return fmt.Errorf("delete volume %d from %s: %v", volumeId, sourceVolumeServer, err)
  80. }
  81. log.Printf("moved volume %d from %s to %s", volumeId, sourceVolumeServer, targetVolumeServer)
  82. return nil
  83. }
  84. func copyVolume(grpcDialOption grpc.DialOption, writer io.Writer, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, diskType string, ioBytePerSecond int64) (lastAppendAtNs uint64, err error) {
  85. // check to see if the volume is already read-only and if its not then we need
  86. // to mark it as read-only and then before we return we need to undo what we
  87. // did
  88. var shouldMarkWritable bool
  89. defer func() {
  90. if !shouldMarkWritable {
  91. return
  92. }
  93. clientErr := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  94. _, writableErr := volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
  95. VolumeId: uint32(volumeId),
  96. })
  97. return writableErr
  98. })
  99. if clientErr != nil {
  100. log.Printf("failed to mark volume %d as writable after copy from %s: %v", volumeId, sourceVolumeServer, clientErr)
  101. }
  102. }()
  103. err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  104. resp, statusErr := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
  105. VolumeId: uint32(volumeId),
  106. })
  107. if statusErr == nil && !resp.IsReadOnly {
  108. shouldMarkWritable = true
  109. _, readonlyErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  110. VolumeId: uint32(volumeId),
  111. Persist: false,
  112. })
  113. return readonlyErr
  114. }
  115. return statusErr
  116. })
  117. if err != nil {
  118. return
  119. }
  120. err = operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  121. stream, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  122. VolumeId: uint32(volumeId),
  123. SourceDataNode: string(sourceVolumeServer),
  124. DiskType: diskType,
  125. IoBytePerSecond: ioBytePerSecond,
  126. })
  127. if replicateErr != nil {
  128. return replicateErr
  129. }
  130. for {
  131. resp, recvErr := stream.Recv()
  132. if recvErr != nil {
  133. if recvErr == io.EOF {
  134. break
  135. } else {
  136. return recvErr
  137. }
  138. }
  139. if resp.LastAppendAtNs != 0 {
  140. lastAppendAtNs = resp.LastAppendAtNs
  141. } else {
  142. fmt.Fprintf(writer, "volume %d processed %d bytes\n", volumeId, resp.ProcessedBytes)
  143. }
  144. }
  145. return nil
  146. })
  147. return
  148. }
  149. func tailVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer, targetVolumeServer pb.ServerAddress, lastAppendAtNs uint64, idleTimeout time.Duration) (err error) {
  150. return operation.WithVolumeServerClient(true, targetVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  151. _, replicateErr := volumeServerClient.VolumeTailReceiver(context.Background(), &volume_server_pb.VolumeTailReceiverRequest{
  152. VolumeId: uint32(volumeId),
  153. SinceNs: lastAppendAtNs,
  154. IdleTimeoutSeconds: uint32(idleTimeout.Seconds()),
  155. SourceVolumeServer: string(sourceVolumeServer),
  156. })
  157. return replicateErr
  158. })
  159. }
  160. func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, onlyEmpty bool) (err error) {
  161. return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  162. _, deleteErr := volumeServerClient.VolumeDelete(context.Background(), &volume_server_pb.VolumeDeleteRequest{
  163. VolumeId: uint32(volumeId),
  164. OnlyEmpty: onlyEmpty,
  165. })
  166. return deleteErr
  167. })
  168. }
  169. func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer pb.ServerAddress, writable, persist bool) (err error) {
  170. return operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  171. if writable {
  172. _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
  173. VolumeId: uint32(volumeId),
  174. })
  175. } else {
  176. _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  177. VolumeId: uint32(volumeId),
  178. Persist: persist,
  179. })
  180. }
  181. return err
  182. })
  183. }
  184. func markVolumeReplicasWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location, writable, persist bool) error {
  185. for _, location := range locations {
  186. fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
  187. if err := markVolumeWritable(grpcDialOption, volumeId, location.ServerAddress(), writable, persist); err != nil {
  188. return err
  189. }
  190. }
  191. return nil
  192. }