123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- package shell
- import (
- "context"
- "flag"
- "fmt"
- "io"
- "io/ioutil"
- "math"
- "os"
- "path/filepath"
- "sync"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
- )
- func init() {
- Commands = append(Commands, &commandVolumeFsck{})
- }
- type commandVolumeFsck struct {
- env *CommandEnv
- }
- func (c *commandVolumeFsck) Name() string {
- return "volume.fsck"
- }
- func (c *commandVolumeFsck) Help() string {
- return `check all volumes to find entries not used by the filer
- Important assumption!!!
- the system is all used by one filer.
- This command works this way:
- 1. collect all file ids from all volumes, as set A
- 2. collect all file ids from the filer, as set B
- 3. find out the set A subtract B
- `
- }
- func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- if err = commandEnv.confirmIsLocked(); err != nil {
- return
- }
- fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- verbose := fsckCommand.Bool("v", false, "verbose mode")
- applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
- if err = fsckCommand.Parse(args); err != nil {
- return nil
- }
- c.env = commandEnv
- // create a temp folder
- tempFolder, err := ioutil.TempDir("", "sw_fsck")
- if err != nil {
- return fmt.Errorf("failed to create temp folder: %v", err)
- }
- if *verbose {
- fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
- }
- defer os.RemoveAll(tempFolder)
- // collect all volume id locations
- volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
- if err != nil {
- return fmt.Errorf("failed to collect all volume locations: %v", err)
- }
- // collect each volume file ids
- for volumeId, vinfo := range volumeIdToVInfo {
- err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
- if err != nil {
- return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
- }
- }
- // collect all filer file ids
- if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil {
- return fmt.Errorf("failed to collect file ids from filer: %v", err)
- }
- // volume file ids substract filer file ids
- var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
- for volumeId, vinfo := range volumeIdToVInfo {
- inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, *verbose)
- if checkErr != nil {
- return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
- }
- totalInUseCount += inUseCount
- totalOrphanChunkCount += uint64(len(orphanFileIds))
- totalOrphanDataSize += orphanDataSize
- if *verbose {
- for _, fid := range orphanFileIds {
- fmt.Fprintf(writer, "%sxxxxxxxx\n", fid)
- }
- }
- if *applyPurging && len(orphanFileIds) > 0 {
- if vinfo.isEcVolume {
- fmt.Fprintf(writer, "Skip purging for Erasure Coded volumes.\n")
- }
- if err = c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
- return fmt.Errorf("purge for volume %d: %v\n", volumeId, err)
- }
- }
- }
- if totalOrphanChunkCount == 0 {
- fmt.Fprintf(writer, "no orphan data\n")
- return nil
- }
- if !*applyPurging {
- pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
- fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
- totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
- fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
- }
- return nil
- }
- func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
- if verbose {
- fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
- }
- return operation.WithVolumeServerClient(vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
- ext := ".idx"
- if vinfo.isEcVolume {
- ext = ".ecx"
- }
- copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
- VolumeId: volumeId,
- Ext: ext,
- CompactionRevision: math.MaxUint32,
- StopOffset: math.MaxInt64,
- Collection: vinfo.collection,
- IsEcVolume: vinfo.isEcVolume,
- IgnoreSourceFileNotFound: false,
- })
- if err != nil {
- return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
- }
- err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
- if err != nil {
- return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
- }
- return nil
- })
- }
- func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error {
- if verbose {
- fmt.Fprintf(writer, "collecting file ids from filer ...\n")
- }
- files := make(map[uint32]*os.File)
- for vid := range volumeIdToServer {
- dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if openErr != nil {
- return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
- }
- files[vid] = dst
- }
- defer func() {
- for _, f := range files {
- f.Close()
- }
- }()
- type Item struct {
- vid uint32
- fileKey uint64
- }
- return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
- buffer := make([]byte, 8)
- for item := range outputChan {
- i := item.(*Item)
- util.Uint64toBytes(buffer, i.fileKey)
- files[i.vid].Write(buffer)
- }
- }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
- if resolveErr != nil {
- return nil
- }
- dChunks = append(dChunks, mChunks...)
- for _, chunk := range dChunks {
- outputChan <- &Item{
- vid: chunk.Fid.VolumeId,
- fileKey: chunk.Fid.FileKey,
- }
- }
- return nil
- })
- }
- func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
- db := needle_map.NewMemDb()
- defer db.Close()
- if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
- return
- }
- filerFileIdsData, err := ioutil.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
- if err != nil {
- return
- }
- dataLen := len(filerFileIdsData)
- if dataLen%8 != 0 {
- return 0, nil, 0, fmt.Errorf("filer data is corrupted")
- }
- for i := 0; i < len(filerFileIdsData); i += 8 {
- fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
- db.Delete(types.NeedleId(fileKey))
- inUseCount++
- }
- var orphanFileCount uint64
- db.AscendingVisit(func(n needle_map.NeedleValue) error {
- // fmt.Printf("%d,%x\n", volumeId, n.Key)
- orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s", volumeId, n.Key.String()))
- orphanFileCount++
- orphanDataSize += uint64(n.Size)
- return nil
- })
- if orphanFileCount > 0 {
- pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
- fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
- volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
- }
- return
- }
- type VInfo struct {
- server string
- collection string
- isEcVolume bool
- }
- func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
- if verbose {
- fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
- }
- volumeIdToServer = make(map[uint32]VInfo)
- // collect topology information
- topologyInfo, _, err := collectTopologyInfo(commandEnv)
- if err != nil {
- return
- }
- eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
- for _, diskInfo := range t.DiskInfos {
- for _, vi := range diskInfo.VolumeInfos {
- volumeIdToServer[vi.Id] = VInfo{
- server: t.Id,
- collection: vi.Collection,
- isEcVolume: false,
- }
- }
- for _, ecShardInfo := range diskInfo.EcShardInfos {
- volumeIdToServer[ecShardInfo.Id] = VInfo{
- server: t.Id,
- collection: ecShardInfo.Collection,
- isEcVolume: true,
- }
- }
- }
- })
- if verbose {
- fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
- }
- return
- }
- func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
- fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
- locations, found := c.env.MasterClient.GetLocations(volumeId)
- if !found {
- return fmt.Errorf("failed to find volume %d locations", volumeId)
- }
- resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
- var wg sync.WaitGroup
- for _, location := range locations {
- wg.Add(1)
- go func(server string, fidList []string) {
- defer wg.Done()
- if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
- err = deleteErr
- } else if deleteResults != nil {
- resultChan <- deleteResults
- }
- }(location.Url, fileIds)
- }
- wg.Wait()
- close(resultChan)
- for results := range resultChan {
- for _, result := range results {
- if result.Error != "" {
- fmt.Fprintf(writer, "purge error: %s\n", result.Error)
- }
- }
- }
- return
- }
- func getVolumeFileIdFile(tempFolder string, vid uint32) string {
- return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
- }
- func getFilerFileIdFile(tempFolder string, vid uint32) string {
- return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
- }
- func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
- flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
- dst, err := os.OpenFile(fileName, flags, 0644)
- if err != nil {
- return nil
- }
- defer dst.Close()
- for {
- resp, receiveErr := client.Recv()
- if receiveErr == io.EOF {
- break
- }
- if receiveErr != nil {
- return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
- }
- dst.Write(resp.FileContent)
- }
- return nil
- }
|