needle_map.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package storage
  2. import (
  3. "io"
  4. "os"
  5. "sync"
  6. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  8. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. )
  10. type NeedleMapKind int
  11. const (
  12. NeedleMapInMemory NeedleMapKind = iota
  13. NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
  14. NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
  15. NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
  16. )
  17. type NeedleMapper interface {
  18. Put(key NeedleId, offset Offset, size Size) error
  19. Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
  20. Delete(key NeedleId, offset Offset) error
  21. Close()
  22. Destroy() error
  23. ContentSize() uint64
  24. DeletedSize() uint64
  25. FileCount() int
  26. DeletedCount() int
  27. MaxFileKey() NeedleId
  28. IndexFileSize() uint64
  29. Sync() error
  30. ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
  31. }
  32. type baseNeedleMapper struct {
  33. mapMetric
  34. indexFile *os.File
  35. indexFileAccessLock sync.Mutex
  36. indexFileOffset int64
  37. }
  38. func (nm *baseNeedleMapper) IndexFileSize() uint64 {
  39. stat, err := nm.indexFile.Stat()
  40. if err == nil {
  41. return uint64(stat.Size())
  42. }
  43. return 0
  44. }
  45. func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size Size) error {
  46. bytes := needle_map.ToBytes(key, offset, size)
  47. nm.indexFileAccessLock.Lock()
  48. defer nm.indexFileAccessLock.Unlock()
  49. written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset)
  50. if err == nil {
  51. nm.indexFileOffset += int64(written)
  52. }
  53. return err
  54. }
  55. func (nm *baseNeedleMapper) Sync() error {
  56. return nm.indexFile.Sync()
  57. }
  58. func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) {
  59. bytes := make([]byte, NeedleMapEntrySize)
  60. var readCount int
  61. if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil {
  62. if err == io.EOF {
  63. if readCount == NeedleMapEntrySize {
  64. err = nil
  65. }
  66. }
  67. if err != nil {
  68. return
  69. }
  70. }
  71. key, offset, size = idx.IdxFileEntry(bytes)
  72. return
  73. }