command_fs_verify.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/storage"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. "golang.org/x/exp/slices"
  15. "io"
  16. "math"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. func init() {
  22. Commands = append(Commands, &commandFsVerify{})
  23. }
  24. type commandFsVerify struct {
  25. env *CommandEnv
  26. volumeServers []pb.ServerAddress
  27. volumeIds map[uint32][]pb.ServerAddress
  28. verbose *bool
  29. concurrency *int
  30. modifyTimeAgoAtSec int64
  31. writer io.Writer
  32. waitChan map[string]chan struct{}
  33. waitChanLock sync.RWMutex
  34. }
  35. func (c *commandFsVerify) Name() string {
  36. return "fs.verify"
  37. }
  38. func (c *commandFsVerify) Help() string {
  39. return `recursively verify all files under a directory
  40. fs.verify [-v] [-modifyTimeAgo 1h] /buckets/dir
  41. `
  42. }
  43. func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  44. c.env = commandEnv
  45. c.writer = writer
  46. fsVerifyCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  47. c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files")
  48. modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify")
  49. c.concurrency = fsVerifyCommand.Int("concurrency", 0, "number of parallel verification per volume server")
  50. if err = fsVerifyCommand.Parse(args); err != nil {
  51. return err
  52. }
  53. path, parseErr := commandEnv.parseUrl(findInputDirectory(fsVerifyCommand.Args()))
  54. if parseErr != nil {
  55. return parseErr
  56. }
  57. c.modifyTimeAgoAtSec = int64(modifyTimeAgo.Seconds())
  58. c.volumeIds = make(map[uint32][]pb.ServerAddress)
  59. c.waitChan = make(map[string]chan struct{})
  60. c.volumeServers = []pb.ServerAddress{}
  61. defer func() {
  62. c.modifyTimeAgoAtSec = 0
  63. c.volumeIds = nil
  64. c.waitChan = nil
  65. c.volumeServers = nil
  66. }()
  67. if err := c.collectVolumeIds(); err != nil {
  68. return parseErr
  69. }
  70. if *c.concurrency > 0 {
  71. for _, volumeServer := range c.volumeServers {
  72. volumeServerStr := string(volumeServer)
  73. c.waitChan[volumeServerStr] = make(chan struct{}, *c.concurrency)
  74. defer close(c.waitChan[volumeServerStr])
  75. }
  76. }
  77. fCount, eConut, terr := c.verifyTraverseBfs(path)
  78. if terr == nil {
  79. fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
  80. }
  81. return terr
  82. }
  83. func (c *commandFsVerify) collectVolumeIds() error {
  84. topologyInfo, _, err := collectTopologyInfo(c.env, 0)
  85. if err != nil {
  86. return err
  87. }
  88. eachDataNode(topologyInfo, func(dc string, rack RackId, nodeInfo *master_pb.DataNodeInfo) {
  89. for _, diskInfo := range nodeInfo.DiskInfos {
  90. for _, vi := range diskInfo.VolumeInfos {
  91. volumeServer := pb.NewServerAddressFromDataNode(nodeInfo)
  92. c.volumeIds[vi.Id] = append(c.volumeIds[vi.Id], volumeServer)
  93. if !slices.Contains(c.volumeServers, volumeServer) {
  94. c.volumeServers = append(c.volumeServers, volumeServer)
  95. }
  96. }
  97. }
  98. })
  99. return nil
  100. }
  101. func (c *commandFsVerify) verifyEntry(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error {
  102. err := operation.WithVolumeServerClient(false, volumeServer, c.env.option.GrpcDialOption,
  103. func(client volume_server_pb.VolumeServerClient) error {
  104. _, err := client.VolumeNeedleStatus(context.Background(),
  105. &volume_server_pb.VolumeNeedleStatusRequest{
  106. VolumeId: fileId.VolumeId,
  107. NeedleId: fileId.FileKey})
  108. return err
  109. },
  110. )
  111. if err != nil && !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
  112. return err
  113. }
  114. return nil
  115. }
  116. type ItemEntry struct {
  117. chunks []*filer_pb.FileChunk
  118. path util.FullPath
  119. }
  120. func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount int64, errCount int64, err error) {
  121. timeNowAtSec := time.Now().Unix()
  122. return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
  123. func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  124. if c.modifyTimeAgoAtSec > 0 {
  125. if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
  126. return nil
  127. }
  128. }
  129. dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.GetChunks(), 0, math.MaxInt64)
  130. if resolveErr != nil {
  131. return fmt.Errorf("failed to ResolveChunkManifest: %+v", resolveErr)
  132. }
  133. dataChunks = append(dataChunks, manifestChunks...)
  134. if len(dataChunks) > 0 {
  135. outputChan <- &ItemEntry{
  136. chunks: dataChunks,
  137. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  138. }
  139. }
  140. return nil
  141. },
  142. func(outputChan chan interface{}) {
  143. for itemEntry := range outputChan {
  144. i := itemEntry.(*ItemEntry)
  145. itemPath := string(i.path)
  146. fileMsg := fmt.Sprintf("file:%s", itemPath)
  147. errItem := make(map[string]error)
  148. errItemLock := sync.RWMutex{}
  149. for _, chunk := range i.chunks {
  150. if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
  151. for _, volumeServer := range volumeIds {
  152. if *c.concurrency == 0 {
  153. if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil {
  154. fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n",
  155. fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
  156. }
  157. continue
  158. }
  159. c.waitChanLock.RLock()
  160. waitChan, ok := c.waitChan[string(volumeServer)]
  161. c.waitChanLock.RUnlock()
  162. if !ok {
  163. fmt.Fprintf(c.writer, "%s failed to get channel for %s chunk: %d:%d: %+v\n",
  164. string(volumeServer), fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
  165. continue
  166. }
  167. waitChan <- struct{}{}
  168. go func(fId *filer_pb.FileId, path string, volumeServer pb.ServerAddress, msg string) {
  169. if err = c.verifyEntry(volumeServer, fId); err != nil {
  170. errItemLock.Lock()
  171. errItem[path] = err
  172. fmt.Fprintf(c.writer, "%s failed verify needle %d:%d: %+v\n",
  173. msg, fId.VolumeId, fId.FileKey, err)
  174. errItemLock.Unlock()
  175. }
  176. <-waitChan
  177. }(chunk.Fid, itemPath, volumeServer, fileMsg)
  178. }
  179. } else {
  180. err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
  181. fmt.Fprintf(c.writer, "%s %d:%d: %+v\n",
  182. fileMsg, chunk.Fid.VolumeId, chunk.Fid.FileKey, err)
  183. break
  184. }
  185. }
  186. errItemLock.RLock()
  187. err, _ = errItem[itemPath]
  188. errItemLock.RUnlock()
  189. if err != nil {
  190. errCount++
  191. continue
  192. }
  193. if *c.verbose {
  194. fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks))
  195. }
  196. fileCount++
  197. }
  198. })
  199. }