package shell import ( "context" "errors" "flag" "fmt" "io" "net/http" "sort" "strings" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/wdclient" "golang.org/x/exp/maps" "golang.org/x/exp/slices" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func init() { Commands = append(Commands, &commandFsMergeVolumes{}) } type commandFsMergeVolumes struct { volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage volumeSizeLimit uint64 } func (c *commandFsMergeVolumes) Name() string { return "fs.mergeVolumes" } func (c *commandFsMergeVolumes) Help() string { return `re-locate chunks into target volumes and try to clear lighter volumes. This would help clear half-full volumes and let vacuum system to delete them later. fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply] ` } func (c *commandFsMergeVolumes) HasTag(CommandTag) bool { return false } func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files") fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id") toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id") collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge") apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes") if err = fsMergeVolumesCommand.Parse(args); err != nil { return err } dir := *dirArg if dir != "/" { dir = strings.TrimRight(dir, "/") } fromVolumeId := needle.VolumeId(*fromVolumeArg) toVolumeId := needle.VolumeId(*toVolumeArg) c.reloadVolumesInfo(commandEnv.MasterClient) if fromVolumeId != 0 && toVolumeId != 0 { if fromVolumeId == toVolumeId { return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId) } compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId) if err != nil { return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId) } if !compatible { return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId) } fromSize := c.getVolumeSizeById(fromVolumeId) toSize := c.getVolumeSizeById(toVolumeId) if fromSize+toSize > c.volumeSizeLimit { return fmt.Errorf( "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)", fromVolumeId, fromSize/1024/1024, toVolumeId, toSize/1024/1024, c.volumeSizeLimit/1024/102, ) } } plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId) if err != nil { return err } c.printPlan(plan) if len(plan) == 0 { return nil } defer util_http.GetGlobalHttpClient().CloseIdleConnections() return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { if entry.IsDirectory { return } for _, chunk := range entry.Chunks { if chunk.IsChunkManifest { fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name) continue } chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId) toVolumeId, found := plan[chunkVolumeId] if !found { continue } path := parentPath.Child(entry.Name) fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString()) if !*apply { continue } if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil { fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err) continue } if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{ Directory: string(parentPath), Entry: entry, }); err != nil { fmt.Printf("failed to update %s: %v\n", path, err) } } }) }) } func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) { info := c.volumes[vid] var err error if info == nil { err = errors.New("cannot find volume") } return info, err } func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) { srcInfo, err := c.getVolumeInfoById(src) if err != nil { return false, err } destInfo, err := c.getVolumeInfoById(dest) if err != nil { return false, err } return (srcInfo.Collection == destInfo.Collection && srcInfo.Ttl == destInfo.Ttl && srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil } func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error { c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage) return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error { volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) if err != nil { return err } c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024 for _, dc := range volumes.TopologyInfo.DataCenterInfos { for _, rack := range dc.RackInfos { for _, node := range rack.DataNodeInfos { for _, disk := range node.DiskInfos { for _, volume := range disk.VolumeInfos { vid := needle.VolumeId(volume.Id) if found := c.volumes[vid]; found == nil { c.volumes[vid] = volume } } } } } } return nil }) } func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) { plan := make(map[needle.VolumeId]needle.VolumeId) volumes := maps.Keys(c.volumes) sort.Slice(volumes, func(a, b int) bool { return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size }) l := len(volumes) for i := 0; i < l; i++ { volume := c.volumes[volumes[i]] if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) { volumes = slices.Delete(volumes, i, i+1) i-- l-- } } for i := l - 1; i >= 0; i-- { src := volumes[i] if fromVolumeId != 0 && src != fromVolumeId { continue } for j := 0; j < i; j++ { condidate := volumes[j] if toVolumeId != 0 && condidate != toVolumeId { continue } if _, moving := plan[condidate]; moving { continue } compatible, err := c.volumesAreCompatible(src, condidate) if err != nil { return nil, err } if !compatible { continue } if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit { continue } plan[src] = condidate break } } return plan, nil } func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 { size := c.getVolumeSizeById(vid) for src, dest := range plan { if dest == vid { size += c.getVolumeSizeById(src) } } return size } func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 { return volume.Size - volume.DeletedByteCount } func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 { return c.getVolumeSize(c.volumes[vid]) } func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) { fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024) reversePlan := make(map[needle.VolumeId][]needle.VolumeId) for src, dest := range plan { reversePlan[dest] = append(reversePlan[dest], src) } for dest, srcs := range reversePlan { currentSize := c.getVolumeSizeById(dest) for _, src := range srcs { srcSize := c.getVolumeSizeById(src) newSize := currentSize + srcSize fmt.Printf( "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n", src, srcSize/1024/1024, dest, currentSize/1024/1024, newSize/1024/1024, ) currentSize = newSize } fmt.Println() } } func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error { fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie) toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie) downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String()) if err != nil { return err } downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String()) uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String()) if err != nil { return err } uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String()) resp, reader, err := readUrl(downloadURL) if err != nil { return err } defer util_http.CloseResponse(resp) defer reader.Close() var filename string contentDisposition := resp.Header.Get("Content-Disposition") if len(contentDisposition) > 0 { idx := strings.Index(contentDisposition, "filename=") if idx != -1 { filename = contentDisposition[idx+len("filename="):] filename = strings.Trim(filename, "\"") } } contentType := resp.Header.Get("Content-Type") isCompressed := resp.Header.Get("Content-Encoding") == "gzip" md5 := resp.Header.Get("Content-MD5") uploader, err := operation.NewUploader() if err != nil { return err } _, err, _ = uploader.Upload(reader, &operation.UploadOption{ UploadUrl: uploadURL, Filename: filename, IsInputCompressed: isCompressed, Cipher: false, MimeType: contentType, PairMap: nil, Md5: md5, }) if err != nil { return err } chunk.Fid.VolumeId = uint32(toVolumeId) chunk.FileId = "" return nil } func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) { req, err := http.NewRequest(http.MethodGet, fileUrl, nil) if err != nil { return nil, nil, err } req.Header.Add("Accept-Encoding", "gzip") r, err := util_http.GetGlobalHttpClient().Do(req) if err != nil { return nil, nil, err } if r.StatusCode >= 400 { util_http.CloseResponse(r) return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } return r, r.Body, nil }