needle_map.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package storage
  2. import (
  3. "io"
  4. "os"
  5. "sync"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  8. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. )
  11. type NeedleMapKind int
  12. const (
  13. NeedleMapInMemory NeedleMapKind = iota
  14. NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
  15. NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
  16. NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
  17. )
  18. type NeedleMapper interface {
  19. Put(key NeedleId, offset Offset, size Size) error
  20. Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
  21. Delete(key NeedleId, offset Offset) error
  22. Close()
  23. Destroy() error
  24. ContentSize() uint64
  25. DeletedSize() uint64
  26. FileCount() int
  27. DeletedCount() int
  28. MaxFileKey() NeedleId
  29. IndexFileSize() uint64
  30. Sync() error
  31. ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
  32. }
  33. type baseNeedleMapper struct {
  34. mapMetric
  35. indexFile *os.File
  36. indexFileAccessLock sync.Mutex
  37. indexFileOffset int64
  38. }
  39. type TempNeedleMapper interface {
  40. NeedleMapper
  41. DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) error
  42. UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error
  43. }
  44. func (nm *baseNeedleMapper) IndexFileSize() uint64 {
  45. stat, err := nm.indexFile.Stat()
  46. if err == nil {
  47. return uint64(stat.Size())
  48. }
  49. return 0
  50. }
  51. func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size Size) error {
  52. bytes := needle_map.ToBytes(key, offset, size)
  53. nm.indexFileAccessLock.Lock()
  54. defer nm.indexFileAccessLock.Unlock()
  55. written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset)
  56. if err == nil {
  57. nm.indexFileOffset += int64(written)
  58. }
  59. return err
  60. }
  61. func (nm *baseNeedleMapper) Sync() error {
  62. return nm.indexFile.Sync()
  63. }
  64. func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) {
  65. bytes := make([]byte, NeedleMapEntrySize)
  66. var readCount int
  67. if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil {
  68. if err == io.EOF {
  69. if readCount == NeedleMapEntrySize {
  70. err = nil
  71. }
  72. }
  73. if err != nil {
  74. return
  75. }
  76. }
  77. key, offset, size = idx.IdxFileEntry(bytes)
  78. return
  79. }