123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- package shell
- import (
- "flag"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "golang.org/x/exp/slices"
- "io"
- "os"
- )
- func init() {
- Commands = append(Commands, &commandVolumeServerEvacuate{})
- }
- type commandVolumeServerEvacuate struct {
- topologyInfo *master_pb.TopologyInfo
- targetServer *string
- volumeRack *string
- }
- func (c *commandVolumeServerEvacuate) Name() string {
- return "volumeServer.evacuate"
- }
- func (c *commandVolumeServerEvacuate) Help() string {
- return `move out all data on a volume server
- volumeServer.evacuate -node <host:port>
- This command moves all data away from the volume server.
- The volumes on the volume servers will be redistributed.
- Usually this is used to prepare to shutdown or upgrade the volume server.
- Sometimes a volume can not be moved because there are no
- good destination to meet the replication requirement.
- E.g. a volume replication 001 in a cluster with 2 volume servers can not be moved.
- You can use "-skipNonMoveable" to move the rest volumes.
- `
- }
- func (c *commandVolumeServerEvacuate) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- vsEvacuateCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- volumeServer := vsEvacuateCommand.String("node", "", "<host>:<port> of the volume server")
- c.volumeRack = vsEvacuateCommand.String("rack", "", "source rack for the volume servers")
- c.targetServer = vsEvacuateCommand.String("target", "", "<host>:<port> of target volume")
- skipNonMoveable := vsEvacuateCommand.Bool("skipNonMoveable", false, "skip volumes that can not be moved")
- applyChange := vsEvacuateCommand.Bool("force", false, "actually apply the changes")
- retryCount := vsEvacuateCommand.Int("retry", 0, "how many times to retry")
- if err = vsEvacuateCommand.Parse(args); err != nil {
- return nil
- }
- infoAboutSimulationMode(writer, *applyChange, "-force")
- if err = commandEnv.confirmIsLocked(args); err != nil && *applyChange {
- return
- }
- if *volumeServer == "" && *c.volumeRack == "" {
- return fmt.Errorf("need to specify volume server by -node=<host>:<port> or source rack")
- }
- for i := 0; i < *retryCount+1; i++ {
- if err = c.volumeServerEvacuate(commandEnv, *volumeServer, *skipNonMoveable, *applyChange, writer); err == nil {
- return nil
- }
- }
- return
- }
- func (c *commandVolumeServerEvacuate) volumeServerEvacuate(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) (err error) {
- // 1. confirm the volume server is part of the cluster
- // 2. collect all other volume servers, sort by empty slots
- // 3. move to any other volume server as long as it satisfy the replication requirements
- // list all the volumes
- // collect topology information
- c.topologyInfo, _, err = collectTopologyInfo(commandEnv, 0)
- if err != nil {
- return err
- }
- defer func() {
- c.topologyInfo = nil
- }()
- if err := c.evacuateNormalVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
- return err
- }
- if err := c.evacuateEcVolumes(commandEnv, volumeServer, skipNonMoveable, applyChange, writer); err != nil {
- return err
- }
- return nil
- }
- func (c *commandVolumeServerEvacuate) evacuateNormalVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
- // find this volume server
- volumeServers := collectVolumeServersByDc(c.topologyInfo, "")
- thisNodes, otherNodes := c.nodesOtherThan(volumeServers, volumeServer)
- if len(thisNodes) == 0 {
- return fmt.Errorf("%s is not found in this cluster", volumeServer)
- }
- // move away normal volumes
- for _, thisNode := range thisNodes {
- for _, diskInfo := range thisNode.info.DiskInfos {
- if applyChange {
- if topologyInfo, _, err := collectTopologyInfo(commandEnv, 0); err != nil {
- fmt.Fprintf(writer, "update topologyInfo %v", err)
- } else {
- _, otherNodesNew := c.nodesOtherThan(
- collectVolumeServersByDc(topologyInfo, ""), volumeServer)
- if len(otherNodesNew) > 0 {
- otherNodes = otherNodesNew
- c.topologyInfo = topologyInfo
- fmt.Fprintf(writer, "topologyInfo updated %v\n", len(otherNodes))
- }
- }
- }
- volumeReplicas, _ := collectVolumeReplicaLocations(c.topologyInfo)
- for _, vol := range diskInfo.VolumeInfos {
- hasMoved, err := moveAwayOneNormalVolume(commandEnv, volumeReplicas, vol, thisNode, otherNodes, applyChange)
- if err != nil {
- fmt.Fprintf(writer, "move away volume %d from %s: %v\n", vol.Id, volumeServer, err)
- }
- if !hasMoved {
- if skipNonMoveable {
- replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vol.ReplicaPlacement))
- fmt.Fprintf(writer, "skipping non moveable volume %d replication:%s\n", vol.Id, replicaPlacement.String())
- } else {
- return fmt.Errorf("failed to move volume %d from %s", vol.Id, volumeServer)
- }
- }
- }
- }
- }
- return nil
- }
- func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv, volumeServer string, skipNonMoveable, applyChange bool, writer io.Writer) error {
- // find this ec volume server
- ecNodes, _ := collectEcVolumeServersByDc(c.topologyInfo, "")
- thisNodes, otherNodes := c.ecNodesOtherThan(ecNodes, volumeServer)
- if len(thisNodes) == 0 {
- return fmt.Errorf("%s is not found in this cluster\n", volumeServer)
- }
- // move away ec volumes
- for _, thisNode := range thisNodes {
- for _, diskInfo := range thisNode.info.DiskInfos {
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- hasMoved, err := c.moveAwayOneEcVolume(commandEnv, ecShardInfo, thisNode, otherNodes, applyChange)
- if err != nil {
- fmt.Fprintf(writer, "move away volume %d from %s: %v", ecShardInfo.Id, volumeServer, err)
- }
- if !hasMoved {
- if skipNonMoveable {
- fmt.Fprintf(writer, "failed to move away ec volume %d from %s\n", ecShardInfo.Id, volumeServer)
- } else {
- return fmt.Errorf("failed to move away ec volume %d from %s", ecShardInfo.Id, volumeServer)
- }
- }
- }
- }
- }
- return nil
- }
- func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool) (hasMoved bool, err error) {
- for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
- slices.SortFunc(otherNodes, func(a, b *EcNode) bool {
- return a.localShardIdCount(ecShardInfo.Id) < b.localShardIdCount(ecShardInfo.Id)
- })
- for i := 0; i < len(otherNodes); i++ {
- emptyNode := otherNodes[i]
- collectionPrefix := ""
- if ecShardInfo.Collection != "" {
- collectionPrefix = ecShardInfo.Collection + "_"
- }
- fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id)
- err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange)
- if err != nil {
- return
- } else {
- hasMoved = true
- break
- }
- }
- if !hasMoved {
- return
- }
- }
- return
- }
- func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][]*VolumeReplica, vol *master_pb.VolumeInformationMessage, thisNode *Node, otherNodes []*Node, applyChange bool) (hasMoved bool, err error) {
- freeVolumeCountfn := capacityByFreeVolumeCount(types.ToDiskType(vol.DiskType))
- maxVolumeCountFn := capacityByMaxVolumeCount(types.ToDiskType(vol.DiskType))
- for _, n := range otherNodes {
- n.selectVolumes(func(v *master_pb.VolumeInformationMessage) bool {
- return v.DiskType == vol.DiskType
- })
- }
- // most empty one is in the front
- slices.SortFunc(otherNodes, func(a, b *Node) bool {
- return a.localVolumeRatio(maxVolumeCountFn) < b.localVolumeRatio(maxVolumeCountFn)
- })
- for i := 0; i < len(otherNodes); i++ {
- emptyNode := otherNodes[i]
- if freeVolumeCountfn(emptyNode.info) < 0 {
- continue
- }
- hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
- if err != nil {
- return
- }
- if hasMoved {
- break
- }
- }
- return
- }
- func (c *commandVolumeServerEvacuate) nodesOtherThan(volumeServers []*Node, thisServer string) (thisNodes []*Node, otherNodes []*Node) {
- for _, node := range volumeServers {
- if node.info.Id == thisServer || (*c.volumeRack != "" && node.rack == *c.volumeRack) {
- thisNodes = append(thisNodes, node)
- continue
- }
- if *c.volumeRack != "" && *c.volumeRack == node.rack {
- continue
- }
- if *c.targetServer != "" && *c.targetServer != node.info.Id {
- continue
- }
- otherNodes = append(otherNodes, node)
- }
- return
- }
- func (c *commandVolumeServerEvacuate) ecNodesOtherThan(volumeServers []*EcNode, thisServer string) (thisNodes []*EcNode, otherNodes []*EcNode) {
- for _, node := range volumeServers {
- if node.info.Id == thisServer || (*c.volumeRack != "" && string(node.rack) == *c.volumeRack) {
- thisNodes = append(thisNodes, node)
- continue
- }
- if *c.volumeRack != "" && *c.volumeRack == string(node.rack) {
- continue
- }
- if *c.targetServer != "" && *c.targetServer != node.info.Id {
- continue
- }
- otherNodes = append(otherNodes, node)
- }
- return
- }
|