needle_map_sorted_file.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package storage
  2. import (
  3. "os"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  7. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  8. )
  9. type SortedFileNeedleMap struct {
  10. baseNeedleMapper
  11. baseFileName string
  12. dbFile *os.File
  13. dbFileSize int64
  14. }
  15. func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
  16. m = &SortedFileNeedleMap{baseFileName: indexBaseFileName}
  17. m.indexFile = indexFile
  18. fileName := indexBaseFileName + ".sdx"
  19. if !isSortedFileFresh(fileName, indexFile) {
  20. glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
  21. erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx")
  22. glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
  23. }
  24. glog.V(1).Infof("Opening %s...", fileName)
  25. if m.dbFile, err = os.OpenFile(indexBaseFileName+".sdx", os.O_RDWR, 0); err != nil {
  26. return
  27. }
  28. dbStat, _ := m.dbFile.Stat()
  29. m.dbFileSize = dbStat.Size()
  30. glog.V(1).Infof("Loading %s...", indexFile.Name())
  31. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  32. if indexLoadError != nil {
  33. _ = m.dbFile.Close()
  34. return nil, indexLoadError
  35. }
  36. m.mapMetric = *mm
  37. return
  38. }
  39. func isSortedFileFresh(dbFileName string, indexFile *os.File) bool {
  40. // normally we always write to index file first
  41. dbFile, err := os.Open(dbFileName)
  42. if err != nil {
  43. return false
  44. }
  45. defer dbFile.Close()
  46. dbStat, dbStatErr := dbFile.Stat()
  47. indexStat, indexStatErr := indexFile.Stat()
  48. if dbStatErr != nil || indexStatErr != nil {
  49. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  50. return false
  51. }
  52. return dbStat.ModTime().After(indexStat.ModTime())
  53. }
  54. func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  55. offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
  56. ok = err == nil
  57. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, ok
  58. }
  59. func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  60. return os.ErrInvalid
  61. }
  62. func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
  63. _, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
  64. if err != nil {
  65. if err == erasure_coding.NotFoundError {
  66. return nil
  67. }
  68. return err
  69. }
  70. if size.IsDeleted() {
  71. return nil
  72. }
  73. // write to index file first
  74. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  75. return err
  76. }
  77. _, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
  78. return err
  79. }
  80. func (m *SortedFileNeedleMap) Close() {
  81. if m == nil {
  82. return
  83. }
  84. if m.indexFile != nil {
  85. m.indexFile.Close()
  86. }
  87. if m.dbFile != nil {
  88. m.dbFile.Close()
  89. }
  90. }
  91. func (m *SortedFileNeedleMap) Destroy() error {
  92. m.Close()
  93. os.Remove(m.indexFile.Name())
  94. return os.Remove(m.baseFileName + ".sdx")
  95. }