fix.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package command
  2. import (
  3. "os"
  4. "path"
  5. "strconv"
  6. "github.com/chrislusf/seaweedfs/weed/util/log"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  10. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  11. "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. cmdFix.Run = runFix // break init cycle
  16. }
  17. var cmdFix = &Command{
  18. UsageLine: "fix -dir=/tmp -volumeId=234",
  19. Short: "run weed tool fix on index file if corrupted",
  20. Long: `Fix runs the SeaweedFS fix command to re-create the index .idx file.
  21. `,
  22. }
  23. var (
  24. fixVolumePath = cmdFix.Flag.String("dir", ".", "data directory to store files")
  25. fixVolumeCollection = cmdFix.Flag.String("collection", "", "the volume collection name")
  26. fixVolumeId = cmdFix.Flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
  27. )
  28. type VolumeFileScanner4Fix struct {
  29. version needle.Version
  30. nm *needle_map.MemDb
  31. }
  32. func (scanner *VolumeFileScanner4Fix) VisitSuperBlock(superBlock super_block.SuperBlock) error {
  33. scanner.version = superBlock.Version
  34. return nil
  35. }
  36. func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
  37. return false
  38. }
  39. func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  40. log.Debugf("key %d offset %d size %d disk_size %d compressed %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsCompressed())
  41. if n.Size.IsValid() {
  42. pe := scanner.nm.Set(n.Id, types.ToOffset(offset), n.Size)
  43. log.Debugf("saved %d with error %v", n.Size, pe)
  44. } else {
  45. log.Debugf("skipping deleted file ...")
  46. return scanner.nm.Delete(n.Id)
  47. }
  48. return nil
  49. }
  50. func runFix(cmd *Command, args []string) bool {
  51. if *fixVolumeId == -1 {
  52. return false
  53. }
  54. baseFileName := strconv.Itoa(*fixVolumeId)
  55. if *fixVolumeCollection != "" {
  56. baseFileName = *fixVolumeCollection + "_" + baseFileName
  57. }
  58. indexFileName := path.Join(util.ResolvePath(*fixVolumePath), baseFileName+".idx")
  59. nm := needle_map.NewMemDb()
  60. defer nm.Close()
  61. vid := needle.VolumeId(*fixVolumeId)
  62. scanner := &VolumeFileScanner4Fix{
  63. nm: nm,
  64. }
  65. if err := storage.ScanVolumeFile(util.ResolvePath(*fixVolumePath), *fixVolumeCollection, vid, storage.NeedleMapInMemory, scanner); err != nil {
  66. log.Fatalf("scan .dat File: %v", err)
  67. os.Remove(indexFileName)
  68. }
  69. if err := nm.SaveToIdx(indexFileName); err != nil {
  70. log.Fatalf("save to .idx File: %v", err)
  71. os.Remove(indexFileName)
  72. }
  73. return true
  74. }