chunk_cache.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package chunk_cache
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  7. )
  8. var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
  9. type ChunkCache interface {
  10. ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
  11. SetChunk(fileId string, data []byte)
  12. }
  13. // a global cache for recently accessed file chunks
  14. type TieredChunkCache struct {
  15. memCache *ChunkCacheInMemory
  16. diskCaches []*OnDiskCacheLayer
  17. sync.RWMutex
  18. onDiskCacheSizeLimit0 uint64
  19. onDiskCacheSizeLimit1 uint64
  20. onDiskCacheSizeLimit2 uint64
  21. }
  22. var _ ChunkCache = &TieredChunkCache{}
  23. func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
  24. c := &TieredChunkCache{
  25. memCache: NewChunkCacheInMemory(maxEntries),
  26. }
  27. c.diskCaches = make([]*OnDiskCacheLayer, 3)
  28. c.onDiskCacheSizeLimit0 = uint64(unitSize)
  29. c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
  30. c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
  31. c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
  32. c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
  33. c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
  34. return c
  35. }
  36. func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
  37. if c == nil {
  38. return 0, nil
  39. }
  40. c.RLock()
  41. defer c.RUnlock()
  42. minSize := offset + uint64(len(data))
  43. if minSize <= c.onDiskCacheSizeLimit0 {
  44. n, err = c.memCache.readChunkAt(data, fileId, offset)
  45. if err != nil {
  46. glog.Errorf("failed to read from memcache: %s", err)
  47. }
  48. if n >= int(minSize) {
  49. return n, nil
  50. }
  51. }
  52. fid, err := needle.ParseFileIdFromString(fileId)
  53. if err != nil {
  54. glog.Errorf("failed to parse file id %s", fileId)
  55. return n, nil
  56. }
  57. if minSize <= c.onDiskCacheSizeLimit0 {
  58. n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
  59. if n >= int(minSize) {
  60. return
  61. }
  62. }
  63. if minSize <= c.onDiskCacheSizeLimit1 {
  64. n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
  65. if n >= int(minSize) {
  66. return
  67. }
  68. }
  69. {
  70. n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
  71. if n >= int(minSize) {
  72. return
  73. }
  74. }
  75. return 0, nil
  76. }
  77. func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
  78. if c == nil {
  79. return
  80. }
  81. c.Lock()
  82. defer c.Unlock()
  83. glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
  84. c.doSetChunk(fileId, data)
  85. }
  86. func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
  87. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  88. c.memCache.SetChunk(fileId, data)
  89. }
  90. fid, err := needle.ParseFileIdFromString(fileId)
  91. if err != nil {
  92. glog.Errorf("failed to parse file id %s", fileId)
  93. return
  94. }
  95. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  96. c.diskCaches[0].setChunk(fid.Key, data)
  97. } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
  98. c.diskCaches[1].setChunk(fid.Key, data)
  99. } else {
  100. c.diskCaches[2].setChunk(fid.Key, data)
  101. }
  102. }
  103. func (c *TieredChunkCache) Shutdown() {
  104. if c == nil {
  105. return
  106. }
  107. c.Lock()
  108. defer c.Unlock()
  109. for _, diskCache := range c.diskCaches {
  110. diskCache.shutdown()
  111. }
  112. }
  113. func min(x, y int) int {
  114. if x < y {
  115. return x
  116. }
  117. return y
  118. }