command_volume_tier_move.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "io"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandVolumeTierMove{})
  14. }
  15. type commandVolumeTierMove struct {
  16. }
  17. func (c *commandVolumeTierMove) Name() string {
  18. return "volume.tier.move"
  19. }
  20. func (c *commandVolumeTierMove) Help() string {
  21. return `change a volume from one disk type to another
  22. volume.tier.move -fromDiskType=hdd -toDiskType=ssd [-collection=""] [-fullPercent=95] [-quietFor=1h]
  23. Even if the volume is replicated, only one replica will be changed and the rest replicas will be dropped.
  24. So "volume.fix.replication" and "volume.balance" should be followed.
  25. `
  26. }
  27. func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  28. if err = commandEnv.confirmIsLocked(); err != nil {
  29. return
  30. }
  31. tierCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  32. collection := tierCommand.String("collection", "", "the collection name")
  33. fullPercentage := tierCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  34. quietPeriod := tierCommand.Duration("quietFor", 24*time.Hour, "select volumes without no writes for this period")
  35. source := tierCommand.String("fromDiskType", "", "the source disk type")
  36. target := tierCommand.String("toDiskType", "", "the target disk type")
  37. applyChange := tierCommand.Bool("force", false, "actually apply the changes")
  38. if err = tierCommand.Parse(args); err != nil {
  39. return nil
  40. }
  41. fromDiskType := types.ToDiskType(*source)
  42. toDiskType := types.ToDiskType(*target)
  43. if fromDiskType == toDiskType {
  44. return fmt.Errorf("source tier %s is the same as target tier %s", fromDiskType, toDiskType)
  45. }
  46. // collect topology information
  47. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
  48. if err != nil {
  49. return err
  50. }
  51. // collect all volumes that should change
  52. volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collection, *fullPercentage, *quietPeriod)
  53. if err != nil {
  54. return err
  55. }
  56. fmt.Printf("tier move volumes: %v\n", volumeIds)
  57. _, allLocations := collectVolumeReplicaLocations(topologyInfo)
  58. for _, vid := range volumeIds {
  59. if err = doVolumeTierMove(commandEnv, writer, *collection, vid, toDiskType, allLocations, *applyChange); err != nil {
  60. fmt.Printf("tier move volume %d: %v\n", vid, err)
  61. }
  62. }
  63. return nil
  64. }
  65. func isOneOf(server string, locations []wdclient.Location) bool {
  66. for _, loc := range locations {
  67. if server == loc.Url {
  68. return true
  69. }
  70. }
  71. return false
  72. }
  73. func doVolumeTierMove(commandEnv *CommandEnv, writer io.Writer, collection string, vid needle.VolumeId, toDiskType types.DiskType, allLocations []location, applyChanges bool) (err error) {
  74. // find volume location
  75. locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
  76. if !found {
  77. return fmt.Errorf("volume %d not found", vid)
  78. }
  79. // find one server with the most empty volume slots with target disk type
  80. hasFoundTarget := false
  81. keepDataNodesSorted(allLocations, toDiskType)
  82. fn := capacityByFreeVolumeCount(toDiskType)
  83. for _, dst := range allLocations {
  84. if fn(dst.dataNode) > 0 && !hasFoundTarget {
  85. // ask the volume server to replicate the volume
  86. if isOneOf(dst.dataNode.Id, locations) {
  87. continue
  88. }
  89. sourceVolumeServer := ""
  90. for _, loc := range locations {
  91. if loc.Url != dst.dataNode.Id {
  92. sourceVolumeServer = loc.Url
  93. }
  94. }
  95. if sourceVolumeServer == "" {
  96. continue
  97. }
  98. fmt.Fprintf(writer, "moving volume %d from %s to %s with disk type %s ...\n", vid, sourceVolumeServer, dst.dataNode.Id, toDiskType.ReadableString())
  99. hasFoundTarget = true
  100. if !applyChanges {
  101. break
  102. }
  103. // mark all replicas as read only
  104. if err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations); err != nil {
  105. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
  106. }
  107. if err = LiveMoveVolume(commandEnv.option.GrpcDialOption, vid, sourceVolumeServer, dst.dataNode.Id, 5*time.Second, toDiskType.ReadableString()); err != nil {
  108. return fmt.Errorf("move volume %d %s => %s : %v", vid, locations[0].Url, dst.dataNode.Id, err)
  109. }
  110. // remove the remaining replicas
  111. for _, loc := range locations {
  112. if loc.Url != sourceVolumeServer {
  113. if err = deleteVolume(commandEnv.option.GrpcDialOption, vid, loc.Url); err != nil {
  114. fmt.Fprintf(writer, "failed to delete volume %d on %s\n", vid, loc.Url)
  115. }
  116. }
  117. }
  118. }
  119. }
  120. if !hasFoundTarget {
  121. fmt.Fprintf(writer, "can not find disk type %s for volume %d\n", toDiskType.ReadableString(), vid)
  122. }
  123. return nil
  124. }
  125. func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  126. quietSeconds := int64(quietPeriod / time.Second)
  127. nowUnixSeconds := time.Now().Unix()
  128. fmt.Printf("collect %s volumes quiet for: %d seconds\n", sourceTier, quietSeconds)
  129. vidMap := make(map[uint32]bool)
  130. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  131. for _, diskInfo := range dn.DiskInfos {
  132. for _, v := range diskInfo.VolumeInfos {
  133. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds && types.ToDiskType(v.DiskType) == sourceTier {
  134. if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
  135. vidMap[v.Id] = true
  136. }
  137. }
  138. }
  139. }
  140. })
  141. for vid := range vidMap {
  142. vids = append(vids, needle.VolumeId(vid))
  143. }
  144. return
  145. }