remove_duplicate_fids.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "os"
  6. "path/filepath"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/storage"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  12. )
  13. var (
  14. volumePath = flag.String("dir", "/tmp", "data directory to store files")
  15. volumeCollection = flag.String("collection", "", "the volume collection name")
  16. volumeId = flag.Int("volumeId", -1, "a volume id. The volume should already exist in the dir. The volume index file should not exist.")
  17. )
  18. func Checksum(n *needle.Needle) string {
  19. return fmt.Sprintf("%s%x", n.Id, n.Cookie)
  20. }
  21. type VolumeFileScanner4SeeDat struct {
  22. version needle.Version
  23. block super_block.SuperBlock
  24. dir string
  25. hashes map[string]bool
  26. dat *os.File
  27. datBackend backend.BackendStorageFile
  28. }
  29. func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock super_block.SuperBlock) error {
  30. scanner.version = superBlock.Version
  31. scanner.block = superBlock
  32. return nil
  33. }
  34. func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
  35. return true
  36. }
  37. func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
  38. if scanner.datBackend == nil {
  39. newFileName := filepath.Join(*volumePath, "dat_fixed")
  40. newDatFile, err := os.Create(newFileName)
  41. if err != nil {
  42. glog.Fatalf("Write New Volume Data %v", err)
  43. }
  44. scanner.datBackend = backend.NewDiskFile(newDatFile)
  45. scanner.datBackend.WriteAt(scanner.block.Bytes(), 0)
  46. }
  47. checksum := Checksum(n)
  48. if scanner.hashes[checksum] {
  49. glog.V(0).Infof("duplicate checksum:%s fid:%d,%s%x @ offset:%d", checksum, *volumeId, n.Id, n.Cookie, offset)
  50. return nil
  51. }
  52. scanner.hashes[checksum] = true
  53. _, s, _, e := n.Append(scanner.datBackend, scanner.version)
  54. fmt.Printf("size %d error %v\n", s, e)
  55. return nil
  56. }
  57. func main() {
  58. flag.Parse()
  59. vid := needle.VolumeId(*volumeId)
  60. outpath, _ := filepath.Abs(filepath.Dir(os.Args[0]))
  61. scanner := &VolumeFileScanner4SeeDat{
  62. dir: filepath.Join(outpath, "out"),
  63. hashes: map[string]bool{},
  64. }
  65. if _, err := os.Stat(scanner.dir); err != nil {
  66. if err := os.MkdirAll(scanner.dir, os.ModePerm); err != nil {
  67. glog.Fatalf("could not create output dir : %s", err)
  68. }
  69. }
  70. err := storage.ScanVolumeFile(*volumePath, *volumeCollection, vid, storage.NeedleMapInMemory, scanner)
  71. if err != nil {
  72. glog.Fatalf("Reading Volume File [ERROR] %s\n", err)
  73. }
  74. }