needle_map.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  9. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  10. )
  11. type NeedleMapType int
  12. const (
  13. NeedleMapInMemory NeedleMapType = iota
  14. NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
  15. NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
  16. NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
  17. )
  18. type NeedleMapper interface {
  19. Put(key NeedleId, offset Offset, size Size) error
  20. Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
  21. Delete(key NeedleId, offset Offset) error
  22. Close()
  23. Destroy() error
  24. ContentSize() uint64
  25. DeletedSize() uint64
  26. FileCount() int
  27. DeletedCount() int
  28. MaxFileKey() NeedleId
  29. IndexFileSize() uint64
  30. Sync() error
  31. ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
  32. }
  33. type baseNeedleMapper struct {
  34. mapMetric
  35. indexFile *os.File
  36. indexFileAccessLock sync.Mutex
  37. }
  38. func (nm *baseNeedleMapper) IndexFileSize() uint64 {
  39. stat, err := nm.indexFile.Stat()
  40. if err == nil {
  41. return uint64(stat.Size())
  42. }
  43. return 0
  44. }
  45. func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size Size) error {
  46. bytes := needle_map.ToBytes(key, offset, size)
  47. nm.indexFileAccessLock.Lock()
  48. defer nm.indexFileAccessLock.Unlock()
  49. if _, err := nm.indexFile.Seek(0, 2); err != nil {
  50. return fmt.Errorf("cannot seek end of indexfile %s: %v",
  51. nm.indexFile.Name(), err)
  52. }
  53. _, err := nm.indexFile.Write(bytes)
  54. return err
  55. }
  56. func (nm *baseNeedleMapper) Sync() error {
  57. return nm.indexFile.Sync()
  58. }
  59. func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) {
  60. bytes := make([]byte, NeedleMapEntrySize)
  61. var readCount int
  62. if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil {
  63. if err == io.EOF {
  64. if readCount == NeedleMapEntrySize {
  65. err = nil
  66. }
  67. }
  68. if err != nil {
  69. return
  70. }
  71. }
  72. key, offset, size = idx.IdxFileEntry(bytes)
  73. return
  74. }