memdb.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package needle_map
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sort"
  8. "github.com/syndtr/goleveldb/leveldb"
  9. "github.com/syndtr/goleveldb/leveldb/iterator"
  10. "github.com/syndtr/goleveldb/leveldb/opt"
  11. "github.com/syndtr/goleveldb/leveldb/storage"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  14. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  15. )
  16. // This map uses in memory level db
  17. type MemDb struct {
  18. db *leveldb.DB
  19. }
  20. func NewMemDb() *MemDb {
  21. opts := &opt.Options{}
  22. var err error
  23. t := &MemDb{}
  24. if t.db, err = leveldb.Open(storage.NewMemStorage(), opts); err != nil {
  25. glog.V(0).Infof("MemDb fails to open: %v", err)
  26. return nil
  27. }
  28. return t
  29. }
  30. func (cm *MemDb) Set(key NeedleId, offset Offset, size Size) error {
  31. bytes := ToBytes(key, offset, size)
  32. if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  33. return fmt.Errorf("failed to write temp leveldb: %v", err)
  34. }
  35. return nil
  36. }
  37. func (cm *MemDb) Delete(key NeedleId) error {
  38. bytes := make([]byte, NeedleIdSize)
  39. NeedleIdToBytes(bytes, key)
  40. return cm.db.Delete(bytes, nil)
  41. }
  42. func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) {
  43. bytes := make([]byte, NeedleIdSize)
  44. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  45. data, err := cm.db.Get(bytes, nil)
  46. if err != nil || len(data) != OffsetSize+SizeSize {
  47. return nil, false
  48. }
  49. offset := BytesToOffset(data[0:OffsetSize])
  50. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  51. return &NeedleValue{Key: key, Offset: offset, Size: size}, true
  52. }
  53. // Visit visits all entries or stop if any error when visiting
  54. func doVisit(iter iterator.Iterator, visit func(NeedleValue) error) (ret error) {
  55. key := BytesToNeedleId(iter.Key())
  56. data := iter.Value()
  57. offset := BytesToOffset(data[0:OffsetSize])
  58. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  59. needle := NeedleValue{Key: key, Offset: offset, Size: size}
  60. ret = visit(needle)
  61. if ret != nil {
  62. return
  63. }
  64. return nil
  65. }
  66. func (cm *MemDb) AscendingVisitByOffset(visit func(NeedleValue) error) (ret error) {
  67. var needles []NeedleValue
  68. err := cm.AscendingVisit(func(value NeedleValue) error {
  69. needles = append(needles, value)
  70. return nil
  71. })
  72. if err != nil {
  73. return err
  74. }
  75. sort.Slice(needles, func(i, j int) bool {
  76. i_bytes := make([]byte, OffsetSize)
  77. j_bytes := make([]byte, OffsetSize)
  78. OffsetToBytes(i_bytes, needles[i].Offset)
  79. OffsetToBytes(j_bytes, needles[j].Offset)
  80. return bytes.Compare(i_bytes, j_bytes) < 0
  81. })
  82. for _, needle := range needles {
  83. ret = visit(needle)
  84. if ret != nil {
  85. return ret
  86. }
  87. }
  88. return nil
  89. }
  90. func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
  91. iter := cm.db.NewIterator(nil, nil)
  92. if iter.First() {
  93. if ret = doVisit(iter, visit); ret != nil {
  94. return
  95. }
  96. }
  97. for iter.Next() {
  98. if ret = doVisit(iter, visit); ret != nil {
  99. return
  100. }
  101. }
  102. iter.Release()
  103. ret = iter.Error()
  104. return
  105. }
  106. func (cm *MemDb) DescendingVisit(visit func(NeedleValue) error) (ret error) {
  107. iter := cm.db.NewIterator(nil, nil)
  108. if iter.Last() {
  109. if ret = doVisit(iter, visit); ret != nil {
  110. return
  111. }
  112. }
  113. for iter.Prev() {
  114. if ret = doVisit(iter, visit); ret != nil {
  115. return
  116. }
  117. }
  118. iter.Release()
  119. ret = iter.Error()
  120. return
  121. }
  122. func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
  123. idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  124. if err != nil {
  125. return
  126. }
  127. defer func() {
  128. idxFile.Sync()
  129. idxFile.Close()
  130. }()
  131. return cm.AscendingVisitByOffset(func(value NeedleValue) error {
  132. if value.Offset.IsZero() || value.Size.IsDeleted() {
  133. return nil
  134. }
  135. _, err := idxFile.Write(value.ToBytes())
  136. return err
  137. })
  138. }
  139. func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
  140. idxFile, err := os.OpenFile(idxName, os.O_RDONLY, 0644)
  141. if err != nil {
  142. return
  143. }
  144. defer idxFile.Close()
  145. return cm.LoadFromReaderAt(idxFile)
  146. }
  147. func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) {
  148. return cm.LoadFilterFromReaderAt(readerAt, true, true)
  149. }
  150. func (cm *MemDb) LoadFilterFromReaderAt(readerAt io.ReaderAt, isFilterOffsetZero bool, isFilterDeleted bool) (ret error) {
  151. return idx.WalkIndexFile(readerAt, 0, func(key NeedleId, offset Offset, size Size) error {
  152. if (isFilterOffsetZero && offset.IsZero()) || (isFilterDeleted && size.IsDeleted()) {
  153. return cm.Delete(key)
  154. }
  155. return cm.Set(key, offset, size)
  156. })
  157. }
  158. func (cm *MemDb) Close() {
  159. cm.db.Close()
  160. }