command_fs_verify.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. "io"
  15. "math"
  16. "strings"
  17. "time"
  18. )
  19. func init() {
  20. Commands = append(Commands, &commandFsVerify{})
  21. }
  22. type commandFsVerify struct {
  23. env *CommandEnv
  24. volumeIds map[uint32][]pb.ServerAddress
  25. verbose *bool
  26. modifyTimeAgoAtSec int64
  27. writer io.Writer
  28. }
  29. func (c *commandFsVerify) Name() string {
  30. return "fs.verify"
  31. }
  32. func (c *commandFsVerify) Help() string {
  33. return `recursively verify all files under a directory
  34. fs.verify [-v] [-modifyTimeAgo 1h] /buckets/dir
  35. `
  36. }
  37. func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  38. c.env = commandEnv
  39. c.writer = writer
  40. fsVerifyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  41. c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files")
  42. modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify")
  43. if err = fsVerifyCommand.Parse(args); err != nil {
  44. return err
  45. }
  46. path, parseErr := commandEnv.parseUrl(findInputDirectory(fsVerifyCommand.Args()))
  47. if parseErr != nil {
  48. return parseErr
  49. }
  50. c.modifyTimeAgoAtSec = int64(modifyTimeAgo.Seconds())
  51. if err := c.collectVolumeIds(); err != nil {
  52. return parseErr
  53. }
  54. fCount, eConut, terr := c.verifyTraverseBfs(path)
  55. if terr == nil {
  56. fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
  57. }
  58. return terr
  59. }
  60. func (c *commandFsVerify) collectVolumeIds() error {
  61. c.volumeIds = make(map[uint32][]pb.ServerAddress)
  62. topologyInfo, _, err := collectTopologyInfo(c.env, 0)
  63. if err != nil {
  64. return err
  65. }
  66. eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) {
  67. for _, diskInfo := range nodeInfo.DiskInfos {
  68. for _, vi := range diskInfo.VolumeInfos {
  69. c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], pb.NewServerAddressFromDataNode(nodeInfo))
  70. }
  71. }
  72. })
  73. return nil
  74. }
  75. func (c *commandFsVerify) verifyEntry(fileId *filer_pb.FileId, volumeServer *pb.ServerAddress) error {
  76. err := operation.WithVolumeServerClient(false, *volumeServer, c.env.option.GrpcDialOption,
  77. func(client volume_server_pb.VolumeServerClient) error {
  78. _, err := client.VolumeNeedleStatus(context.Background(),
  79. &volume_server_pb.VolumeNeedleStatusRequest{
  80. VolumeId: fileId.VolumeId,
  81. NeedleId: fileId.FileKey})
  82. return err
  83. },
  84. )
  85. if err != nil && !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
  86. return err
  87. }
  88. if *c.verbose {
  89. fmt.Fprintf(c.writer, ".")
  90. }
  91. return nil
  92. }
  93. type ItemEntry struct {
  94. chunks []*filer_pb.FileChunk
  95. path util.FullPath
  96. }
  97. func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
  98. timeNowAtSec := time.Now().Unix()
  99. return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
  100. func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  101. if c.modifyTimeAgoAtSec > 0 {
  102. if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
  103. return nil
  104. }
  105. }
  106. dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
  107. if resolveErr != nil {
  108. return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
  109. }
  110. dataChunks = append(dataChunks, manifestChunks...)
  111. if len(dataChunks) > 0 {
  112. outputChan <- &ItemEntry{
  113. chunks: dataChunks,
  114. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  115. }
  116. }
  117. return nil
  118. },
  119. func(outputChan chan interface{}) {
  120. for itemEntry := range outputChan {
  121. i := itemEntry.(*ItemEntry)
  122. fileMsg := fmt.Sprintf("file:%s needle status ", i.path)
  123. if *c.verbose {
  124. fmt.Fprintf(c.writer, fileMsg)
  125. fileMsg = ""
  126. }
  127. for _, chunk := range i.chunks {
  128. if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
  129. for _, volumeServer := range volumeIds {
  130. if err = c.verifyEntry(chunk.Fid, &volumeServer); err != nil {
  131. fmt.Fprintf(c.writer, "%sfailed verify %d:%d: %+v\n",
  132. fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
  133. break
  134. }
  135. }
  136. } else {
  137. err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
  138. fmt.Fprintf(c.writer, "%sfailed verify chunk %d:%d: %+v\n",
  139. fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
  140. break
  141. }
  142. }
  143. if err != nil {
  144. errCount++
  145. continue
  146. }
  147. if *c.verbose {
  148. fmt.Fprintf(c.writer, " verifed\n")
  149. }
  150. fileCount++
  151. }
  152. })
  153. }