123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- package shell
- import (
- "context"
- "errors"
- "flag"
- "fmt"
- "io"
- "net/http"
- "sort"
- "strings"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
- "golang.org/x/exp/maps"
- "golang.org/x/exp/slices"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
- )
- func init() {
- Commands = append(Commands, &commandFsMergeVolumes{})
- }
- type commandFsMergeVolumes struct {
- volumes map[needle.VolumeId]*master_pb.VolumeInformationMessage
- volumeSizeLimit uint64
- }
- func (c *commandFsMergeVolumes) Name() string {
- return "fs.mergeVolumes"
- }
- func (c *commandFsMergeVolumes) Help() string {
- return `re-locate chunks into target volumes and try to clear lighter volumes.
-
- This would help clear half-full volumes and let vacuum system to delete them later.
- fs.mergeVolumes [-toVolumeId=y] [-fromVolumeId=x] [-collection="*"] [-dir=/] [-apply]
- `
- }
- func (c *commandFsMergeVolumes) HasTag(CommandTag) bool {
- return false
- }
- func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- fsMergeVolumesCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- dirArg := fsMergeVolumesCommand.String("dir", "/", "base directory to find and update files")
- fromVolumeArg := fsMergeVolumesCommand.Uint("fromVolumeId", 0, "move chunks with this volume id")
- toVolumeArg := fsMergeVolumesCommand.Uint("toVolumeId", 0, "change chunks to this volume id")
- collectionArg := fsMergeVolumesCommand.String("collection", "*", "Name of collection to merge")
- apply := fsMergeVolumesCommand.Bool("apply", false, "applying the metadata changes")
- if err = fsMergeVolumesCommand.Parse(args); err != nil {
- return err
- }
- dir := *dirArg
- if dir != "/" {
- dir = strings.TrimRight(dir, "/")
- }
- fromVolumeId := needle.VolumeId(*fromVolumeArg)
- toVolumeId := needle.VolumeId(*toVolumeArg)
- c.reloadVolumesInfo(commandEnv.MasterClient)
- if fromVolumeId != 0 && toVolumeId != 0 {
- if fromVolumeId == toVolumeId {
- return fmt.Errorf("no volume id changes, %d == %d", fromVolumeId, toVolumeId)
- }
- compatible, err := c.volumesAreCompatible(fromVolumeId, toVolumeId)
- if err != nil {
- return fmt.Errorf("cannot determine volumes are compatible: %d and %d", fromVolumeId, toVolumeId)
- }
- if !compatible {
- return fmt.Errorf("volume %d is not compatible with volume %d", fromVolumeId, toVolumeId)
- }
- fromSize := c.getVolumeSizeById(fromVolumeId)
- toSize := c.getVolumeSizeById(toVolumeId)
- if fromSize+toSize > c.volumeSizeLimit {
- return fmt.Errorf(
- "volume %d (%d MB) cannot merge into volume %d (%d MB_ due to volume size limit (%d MB)",
- fromVolumeId, fromSize/1024/1024,
- toVolumeId, toSize/1024/1024,
- c.volumeSizeLimit/1024/102,
- )
- }
- }
- plan, err := c.createMergePlan(*collectionArg, toVolumeId, fromVolumeId)
- if err != nil {
- return err
- }
- c.printPlan(plan)
- if len(plan) == 0 {
- return nil
- }
- defer util_http.GetGlobalHttpClient().CloseIdleConnections()
- return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
- return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
- if entry.IsDirectory {
- return
- }
- for _, chunk := range entry.Chunks {
- if chunk.IsChunkManifest {
- fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
- continue
- }
- chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
- toVolumeId, found := plan[chunkVolumeId]
- if !found {
- continue
- }
- path := parentPath.Child(entry.Name)
- fmt.Printf("move %s(%s)\n", path, chunk.GetFileIdString())
- if !*apply {
- continue
- }
- if err = moveChunk(chunk, toVolumeId, commandEnv.MasterClient); err != nil {
- fmt.Printf("failed to move %s/%s: %v\n", path, chunk.GetFileIdString(), err)
- continue
- }
- if err = filer_pb.UpdateEntry(filerClient, &filer_pb.UpdateEntryRequest{
- Directory: string(parentPath),
- Entry: entry,
- }); err != nil {
- fmt.Printf("failed to update %s: %v\n", path, err)
- }
- }
- })
- })
- }
- func (c *commandFsMergeVolumes) getVolumeInfoById(vid needle.VolumeId) (*master_pb.VolumeInformationMessage, error) {
- info := c.volumes[vid]
- var err error
- if info == nil {
- err = errors.New("cannot find volume")
- }
- return info, err
- }
- func (c *commandFsMergeVolumes) volumesAreCompatible(src needle.VolumeId, dest needle.VolumeId) (bool, error) {
- srcInfo, err := c.getVolumeInfoById(src)
- if err != nil {
- return false, err
- }
- destInfo, err := c.getVolumeInfoById(dest)
- if err != nil {
- return false, err
- }
- return (srcInfo.Collection == destInfo.Collection &&
- srcInfo.Ttl == destInfo.Ttl &&
- srcInfo.ReplicaPlacement == destInfo.ReplicaPlacement), nil
- }
- func (c *commandFsMergeVolumes) reloadVolumesInfo(masterClient *wdclient.MasterClient) error {
- c.volumes = make(map[needle.VolumeId]*master_pb.VolumeInformationMessage)
- return masterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- volumes, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
- if err != nil {
- return err
- }
- c.volumeSizeLimit = volumes.GetVolumeSizeLimitMb() * 1024 * 1024
- for _, dc := range volumes.TopologyInfo.DataCenterInfos {
- for _, rack := range dc.RackInfos {
- for _, node := range rack.DataNodeInfos {
- for _, disk := range node.DiskInfos {
- for _, volume := range disk.VolumeInfos {
- vid := needle.VolumeId(volume.Id)
- if found := c.volumes[vid]; found == nil {
- c.volumes[vid] = volume
- }
- }
- }
- }
- }
- }
- return nil
- })
- }
- func (c *commandFsMergeVolumes) createMergePlan(collection string, toVolumeId needle.VolumeId, fromVolumeId needle.VolumeId) (map[needle.VolumeId]needle.VolumeId, error) {
- plan := make(map[needle.VolumeId]needle.VolumeId)
- volumes := maps.Keys(c.volumes)
- sort.Slice(volumes, func(a, b int) bool {
- return c.volumes[volumes[b]].Size < c.volumes[volumes[a]].Size
- })
- l := len(volumes)
- for i := 0; i < l; i++ {
- volume := c.volumes[volumes[i]]
- if volume.GetReadOnly() || c.getVolumeSize(volume) == 0 || (collection != "*" && collection != volume.GetCollection()) {
- volumes = slices.Delete(volumes, i, i+1)
- i--
- l--
- }
- }
- for i := l - 1; i >= 0; i-- {
- src := volumes[i]
- if fromVolumeId != 0 && src != fromVolumeId {
- continue
- }
- for j := 0; j < i; j++ {
- condidate := volumes[j]
- if toVolumeId != 0 && condidate != toVolumeId {
- continue
- }
- if _, moving := plan[condidate]; moving {
- continue
- }
- compatible, err := c.volumesAreCompatible(src, condidate)
- if err != nil {
- return nil, err
- }
- if !compatible {
- continue
- }
- if c.getVolumeSizeBasedOnPlan(plan, condidate)+c.getVolumeSizeById(src) > c.volumeSizeLimit {
- continue
- }
- plan[src] = condidate
- break
- }
- }
- return plan, nil
- }
- func (c *commandFsMergeVolumes) getVolumeSizeBasedOnPlan(plan map[needle.VolumeId]needle.VolumeId, vid needle.VolumeId) uint64 {
- size := c.getVolumeSizeById(vid)
- for src, dest := range plan {
- if dest == vid {
- size += c.getVolumeSizeById(src)
- }
- }
- return size
- }
- func (c *commandFsMergeVolumes) getVolumeSize(volume *master_pb.VolumeInformationMessage) uint64 {
- return volume.Size - volume.DeletedByteCount
- }
- func (c *commandFsMergeVolumes) getVolumeSizeById(vid needle.VolumeId) uint64 {
- return c.getVolumeSize(c.volumes[vid])
- }
- func (c *commandFsMergeVolumes) printPlan(plan map[needle.VolumeId]needle.VolumeId) {
- fmt.Printf("max volume size: %d MB\n", c.volumeSizeLimit/1024/1024)
- reversePlan := make(map[needle.VolumeId][]needle.VolumeId)
- for src, dest := range plan {
- reversePlan[dest] = append(reversePlan[dest], src)
- }
- for dest, srcs := range reversePlan {
- currentSize := c.getVolumeSizeById(dest)
- for _, src := range srcs {
- srcSize := c.getVolumeSizeById(src)
- newSize := currentSize + srcSize
- fmt.Printf(
- "volume %d (%d MB) merge into volume %d (%d MB => %d MB)\n",
- src, srcSize/1024/1024,
- dest, currentSize/1024/1024, newSize/1024/1024,
- )
- currentSize = newSize
- }
- fmt.Println()
- }
- }
- func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClient *wdclient.MasterClient) error {
- fromFid := needle.NewFileId(needle.VolumeId(chunk.Fid.VolumeId), chunk.Fid.FileKey, chunk.Fid.Cookie)
- toFid := needle.NewFileId(toVolumeId, chunk.Fid.FileKey, chunk.Fid.Cookie)
- downloadURLs, err := masterClient.LookupVolumeServerUrl(fromFid.VolumeId.String())
- if err != nil {
- return err
- }
- downloadURL := fmt.Sprintf("http://%s/%s?readDeleted=true", downloadURLs[0], fromFid.String())
- uploadURLs, err := masterClient.LookupVolumeServerUrl(toVolumeId.String())
- if err != nil {
- return err
- }
- uploadURL := fmt.Sprintf("http://%s/%s", uploadURLs[0], toFid.String())
- resp, reader, err := readUrl(downloadURL)
- if err != nil {
- return err
- }
- defer util_http.CloseResponse(resp)
- defer reader.Close()
- var filename string
- contentDisposition := resp.Header.Get("Content-Disposition")
- if len(contentDisposition) > 0 {
- idx := strings.Index(contentDisposition, "filename=")
- if idx != -1 {
- filename = contentDisposition[idx+len("filename="):]
- filename = strings.Trim(filename, "\"")
- }
- }
- contentType := resp.Header.Get("Content-Type")
- isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
- md5 := resp.Header.Get("Content-MD5")
- uploader, err := operation.NewUploader()
- if err != nil {
- return err
- }
- _, err, _ = uploader.Upload(reader, &operation.UploadOption{
- UploadUrl: uploadURL,
- Filename: filename,
- IsInputCompressed: isCompressed,
- Cipher: false,
- MimeType: contentType,
- PairMap: nil,
- Md5: md5,
- })
- if err != nil {
- return err
- }
- chunk.Fid.VolumeId = uint32(toVolumeId)
- chunk.FileId = ""
- return nil
- }
- func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
- req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
- if err != nil {
- return nil, nil, err
- }
- req.Header.Add("Accept-Encoding", "gzip")
- r, err := util_http.GetGlobalHttpClient().Do(req)
- if err != nil {
- return nil, nil, err
- }
- if r.StatusCode >= 400 {
- util_http.CloseResponse(r)
- return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
- }
- return r, r.Body, nil
- }
|