fix_dat.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strconv"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/storage"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. fixVolumePath = flag.String("dir", "/tmp", "data directory to store files")
  15. fixVolumeCollection = flag.String("collection", "", "the volume collection name")
  16. fixVolumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
  17. )
  18. /*
  19. This is to resolve an one-time issue that caused inconsistency with .dat and .idx files.
  20. In this case, the .dat file contains all data, but some of deletion caused incorrect offset.
  21. The .idx has all correct offsets.
  22. 1. fix the .dat file, a new .dat_fixed file will be generated.
  23. go run fix_dat.go -volumeId=9 -dir=/Users/chrislu/Downloads
  24. 2. move the original .dat and .idx files to some backup folder, and rename .dat_fixed to .dat file
  25. mv 9.dat_fixed 9.dat
  26. 3. fix the .idx file with the "weed fix"
  27. weed fix -volumeId=9 -dir=/Users/chrislu/Downloads
  28. */
  29. func main() {
  30. flag.Parse()
  31. fileName := strconv.Itoa(*fixVolumeId)
  32. if *fixVolumeCollection != "" {
  33. fileName = *fixVolumeCollection + "_" + fileName
  34. }
  35. indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
  36. if err != nil {
  37. glog.Fatalf("Read Volume Index [ERROR] %s\n", err)
  38. }
  39. defer indexFile.Close()
  40. datFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".dat"), os.O_RDONLY, 0644)
  41. if err != nil {
  42. glog.Fatalf("Read Volume Data [ERROR] %s\n", err)
  43. }
  44. defer datFile.Close()
  45. newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed"))
  46. if err != nil {
  47. glog.Fatalf("Write New Volume Data [ERROR] %s\n", err)
  48. }
  49. defer newDatFile.Close()
  50. header := make([]byte, storage.SuperBlockSize)
  51. datFile.Read(header)
  52. newDatFile.Write(header)
  53. iterateEntries(datFile, indexFile, func(n *storage.Needle, offset int64) {
  54. fmt.Printf("file id=%d name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize)
  55. s, _, e := n.Append(newDatFile, storage.Version2)
  56. fmt.Printf("size %d error %v\n", s, e)
  57. })
  58. }
  59. func iterateEntries(datFile, idxFile *os.File, visitNeedle func(n *storage.Needle, offset int64)) {
  60. // start to read index file
  61. var readerOffset int64
  62. bytes := make([]byte, 16)
  63. count, _ := idxFile.ReadAt(bytes, readerOffset)
  64. readerOffset += int64(count)
  65. // start to read dat file
  66. offset := int64(storage.SuperBlockSize)
  67. version := storage.Version2
  68. n, rest, err := storage.ReadNeedleHeader(datFile, version, offset)
  69. if err != nil {
  70. fmt.Printf("cannot read needle header: %v", err)
  71. return
  72. }
  73. fmt.Printf("Needle %+v, rest %d\n", n, rest)
  74. for n != nil && count > 0 {
  75. // parse index file entry
  76. key := util.BytesToUint64(bytes[0:8])
  77. offsetFromIndex := util.BytesToUint32(bytes[8:12])
  78. sizeFromIndex := util.BytesToUint32(bytes[12:16])
  79. count, _ = idxFile.ReadAt(bytes, readerOffset)
  80. readerOffset += int64(count)
  81. if offsetFromIndex != 0 && offset != int64(offsetFromIndex)*8 {
  82. //t := offset
  83. offset = int64(offsetFromIndex) * 8
  84. //fmt.Printf("Offset change %d => %d\n", t, offset)
  85. }
  86. fmt.Printf("key: %d offsetFromIndex %d n.Size %d sizeFromIndex:%d\n", key, offsetFromIndex, n.Size, sizeFromIndex)
  87. padding := storage.NeedlePaddingSize - ((sizeFromIndex + storage.NeedleHeaderSize + storage.NeedleChecksumSize) % storage.NeedlePaddingSize)
  88. rest = sizeFromIndex + storage.NeedleChecksumSize + padding
  89. func() {
  90. defer func() {
  91. if r := recover(); r != nil {
  92. fmt.Println("Recovered in f", r)
  93. }
  94. }()
  95. if err = n.ReadNeedleBody(datFile, version, offset+int64(storage.NeedleHeaderSize), rest); err != nil {
  96. fmt.Printf("cannot read needle body: offset %d body %d %v\n", offset, rest, err)
  97. }
  98. }()
  99. if n.Size <= n.DataSize {
  100. continue
  101. }
  102. visitNeedle(n, offset)
  103. offset += int64(storage.NeedleHeaderSize) + int64(rest)
  104. //fmt.Printf("==> new entry offset %d\n", offset)
  105. if n, rest, err = storage.ReadNeedleHeader(datFile, version, offset); err != nil {
  106. if err == io.EOF {
  107. return
  108. }
  109. fmt.Printf("cannot read needle header: %v\n", err)
  110. return
  111. }
  112. //fmt.Printf("new entry needle size:%d rest:%d\n", n.Size, rest)
  113. }
  114. }