fix_dat.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strconv"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  16. )
  17. var (
  18. fixVolumePath = flag.String("dir", "/tmp", "data directory to store files")
  19. fixVolumeCollection = flag.String("collection", "", "the volume collection name")
  20. fixVolumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
  21. )
  22. /*
  23. This is to resolve an one-time issue that caused inconsistency with .dat and .idx files.
  24. In this case, the .dat file contains all data, but some deletion caused incorrect offset.
  25. The .idx has all correct offsets.
  26. 1. fix the .dat file, a new .dat_fixed file will be generated.
  27. go run fix_dat.go -volumeId=9 -dir=/Users/chrislu/Downloads
  28. 2. move the original .dat and .idx files to some backup folder, and rename .dat_fixed to .dat file
  29. mv 9.dat_fixed 9.dat
  30. 3. fix the .idx file with the "weed fix"
  31. weed fix -volumeId=9 -dir=/Users/chrislu/Downloads
  32. */
  33. func main() {
  34. flag.Parse()
  35. util_http.InitGlobalHttpClient()
  36. fileName := strconv.Itoa(*fixVolumeId)
  37. if *fixVolumeCollection != "" {
  38. fileName = *fixVolumeCollection + "_" + fileName
  39. }
  40. indexFile, err := os.OpenFile(path.Join(*fixVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
  41. if err != nil {
  42. glog.Fatalf("Read Volume Index %v", err)
  43. }
  44. defer indexFile.Close()
  45. datFileName := path.Join(*fixVolumePath, fileName+".dat")
  46. datFile, err := os.OpenFile(datFileName, os.O_RDONLY, 0644)
  47. if err != nil {
  48. glog.Fatalf("Read Volume Data %v", err)
  49. }
  50. datBackend := backend.NewDiskFile(datFile)
  51. defer datBackend.Close()
  52. newDatFile, err := os.Create(path.Join(*fixVolumePath, fileName+".dat_fixed"))
  53. if err != nil {
  54. glog.Fatalf("Write New Volume Data %v", err)
  55. }
  56. defer newDatFile.Close()
  57. superBlock, err := super_block.ReadSuperBlock(datBackend)
  58. if err != nil {
  59. glog.Fatalf("Read Volume Data superblock %v", err)
  60. }
  61. newDatFile.Write(superBlock.Bytes())
  62. iterateEntries(datBackend, indexFile, func(n *needle.Needle, offset int64) {
  63. fmt.Printf("needle id=%v name=%s size=%d dataSize=%d\n", n.Id, string(n.Name), n.Size, n.DataSize)
  64. _, s, _, e := n.Append(datBackend, superBlock.Version)
  65. fmt.Printf("size %d error %v\n", s, e)
  66. })
  67. }
  68. func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) {
  69. // start to read index file
  70. var readerOffset int64
  71. bytes := make([]byte, 16)
  72. count, _ := idxFile.ReadAt(bytes, readerOffset)
  73. readerOffset += int64(count)
  74. // start to read dat file
  75. superBlock, err := super_block.ReadSuperBlock(datBackend)
  76. if err != nil {
  77. fmt.Printf("cannot read dat file super block: %v", err)
  78. return
  79. }
  80. offset := int64(superBlock.BlockSize())
  81. version := superBlock.Version
  82. n, _, rest, err := needle.ReadNeedleHeader(datBackend, version, offset)
  83. if err != nil {
  84. fmt.Printf("cannot read needle header: %v", err)
  85. return
  86. }
  87. fmt.Printf("Needle %+v, rest %d\n", n, rest)
  88. for n != nil && count > 0 {
  89. // parse index file entry
  90. key := util.BytesToUint64(bytes[0:8])
  91. offsetFromIndex := util.BytesToUint32(bytes[8:12])
  92. sizeFromIndex := types.BytesToSize(bytes[12:16])
  93. count, _ = idxFile.ReadAt(bytes, readerOffset)
  94. readerOffset += int64(count)
  95. if offsetFromIndex != 0 && offset != int64(offsetFromIndex)*8 {
  96. //t := offset
  97. offset = int64(offsetFromIndex) * 8
  98. //fmt.Printf("Offset change %d => %d\n", t, offset)
  99. }
  100. fmt.Printf("key: %d offsetFromIndex %d n.Size %d sizeFromIndex:%d\n", key, offsetFromIndex, n.Size, sizeFromIndex)
  101. rest = needle.NeedleBodyLength(sizeFromIndex, version)
  102. func() {
  103. defer func() {
  104. if r := recover(); r != nil {
  105. fmt.Println("Recovered in f", r)
  106. }
  107. }()
  108. if _, err = n.ReadNeedleBody(datBackend, version, offset+int64(types.NeedleHeaderSize), rest); err != nil {
  109. fmt.Printf("cannot read needle body: offset %d body %d %v\n", offset, rest, err)
  110. }
  111. }()
  112. if n.Size <= types.Size(n.DataSize) {
  113. continue
  114. }
  115. visitNeedle(n, offset)
  116. offset += types.NeedleHeaderSize + rest
  117. //fmt.Printf("==> new entry offset %d\n", offset)
  118. if n, _, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
  119. if err == io.EOF {
  120. return
  121. }
  122. fmt.Printf("cannot read needle header: %v\n", err)
  123. return
  124. }
  125. //fmt.Printf("new entry needle size:%d rest:%d\n", n.Size, rest)
  126. }
  127. }