needle_map_memory.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package storage
  2. import (
  3. "os"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  7. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  8. "github.com/syndtr/goleveldb/leveldb/opt"
  9. )
  10. type NeedleMap struct {
  11. baseNeedleMapper
  12. m needle_map.NeedleValueMap
  13. }
  14. func NewCompactNeedleMap(file *os.File) *NeedleMap {
  15. nm := &NeedleMap{
  16. m: needle_map.NewCompactMap(),
  17. }
  18. nm.indexFile = file
  19. stat, err := file.Stat()
  20. if err != nil {
  21. glog.Fatalf("stat file %s: %v", file.Name(), err)
  22. }
  23. nm.indexFileOffset = stat.Size()
  24. return nm
  25. }
  26. func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
  27. nm := NewCompactNeedleMap(file)
  28. return doLoading(file, nm)
  29. }
  30. func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
  31. e := idx.WalkIndexFile(file, 0, func(key NeedleId, offset Offset, size Size) error {
  32. nm.MaybeSetMaxFileKey(key)
  33. nm.FileCounter++
  34. if !offset.IsZero() && size.IsValid() {
  35. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  36. oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
  37. if !oldOffset.IsZero() && oldSize.IsValid() {
  38. nm.DeletionCounter++
  39. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  40. }
  41. } else {
  42. oldSize := nm.m.Delete(NeedleId(key))
  43. nm.DeletionCounter++
  44. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  45. }
  46. return nil
  47. })
  48. glog.V(1).Infof("max file key: %d for file: %s", nm.MaxFileKey(), file.Name())
  49. return nm, e
  50. }
  51. func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  52. _, oldSize := nm.m.Set(NeedleId(key), offset, size)
  53. nm.logPut(key, oldSize, size)
  54. return nm.appendToIndexFile(key, offset, size)
  55. }
  56. func (nm *NeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  57. element, ok = nm.m.Get(NeedleId(key))
  58. return
  59. }
  60. func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error {
  61. deletedBytes := nm.m.Delete(NeedleId(key))
  62. nm.logDelete(deletedBytes)
  63. return nm.appendToIndexFile(key, offset, TombstoneFileSize)
  64. }
  65. func (nm *NeedleMap) Close() {
  66. if nm.indexFile == nil {
  67. return
  68. }
  69. indexFileName := nm.indexFile.Name()
  70. if err := nm.indexFile.Sync(); err != nil {
  71. glog.Warningf("sync file %s failed, %v", indexFileName, err)
  72. }
  73. _ = nm.indexFile.Close()
  74. }
  75. func (nm *NeedleMap) Destroy() error {
  76. nm.Close()
  77. return os.Remove(nm.indexFile.Name())
  78. }
  79. func (nm *NeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options) error {
  80. if v.nm != nil {
  81. v.nm.Close()
  82. v.nm = nil
  83. }
  84. defer func() {
  85. if v.tmpNm != nil {
  86. v.tmpNm.Close()
  87. v.tmpNm = nil
  88. }
  89. }()
  90. nm.indexFile = indexFile
  91. stat, err := indexFile.Stat()
  92. if err != nil {
  93. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  94. return err
  95. }
  96. nm.indexFileOffset = stat.Size()
  97. v.nm = nm
  98. v.tmpNm = nil
  99. return nil
  100. }
  101. func (nm *NeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error {
  102. glog.V(0).Infof("loading idx from offset %d for file: %s", startFrom, indexFile.Name())
  103. e := idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) error {
  104. nm.MaybeSetMaxFileKey(key)
  105. nm.FileCounter++
  106. if !offset.IsZero() && size.IsValid() {
  107. nm.FileByteCounter = nm.FileByteCounter + uint64(size)
  108. oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
  109. if !oldOffset.IsZero() && oldSize.IsValid() {
  110. nm.DeletionCounter++
  111. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  112. }
  113. } else {
  114. oldSize := nm.m.Delete(NeedleId(key))
  115. nm.DeletionCounter++
  116. nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
  117. }
  118. return nil
  119. })
  120. return e
  121. }
  122. func (m *NeedleMap) UpdateNeedleMapMetric(indexFile *os.File) error {
  123. return nil
  124. }