chunk_cache.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package chunk_cache
  2. import (
  3. "sync"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. )
  7. type ChunkCache interface {
  8. GetChunk(fileId string, minSize uint64) (data []byte)
  9. SetChunk(fileId string, data []byte)
  10. }
  11. // a global cache for recently accessed file chunks
  12. type TieredChunkCache struct {
  13. memCache *ChunkCacheInMemory
  14. diskCaches []*OnDiskCacheLayer
  15. sync.RWMutex
  16. onDiskCacheSizeLimit0 uint64
  17. onDiskCacheSizeLimit1 uint64
  18. onDiskCacheSizeLimit2 uint64
  19. }
  20. func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
  21. c := &TieredChunkCache{
  22. memCache: NewChunkCacheInMemory(maxEntries),
  23. }
  24. c.diskCaches = make([]*OnDiskCacheLayer, 3)
  25. c.onDiskCacheSizeLimit0 = uint64(unitSize)
  26. c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
  27. c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
  28. c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
  29. c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
  30. c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
  31. return c
  32. }
  33. func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
  34. if c == nil {
  35. return
  36. }
  37. c.RLock()
  38. defer c.RUnlock()
  39. return c.doGetChunk(fileId, minSize)
  40. }
  41. func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) {
  42. if minSize <= c.onDiskCacheSizeLimit0 {
  43. data = c.memCache.GetChunk(fileId)
  44. if len(data) >= int(minSize) {
  45. return data
  46. }
  47. }
  48. fid, err := needle.ParseFileIdFromString(fileId)
  49. if err != nil {
  50. glog.Errorf("failed to parse file id %s", fileId)
  51. return nil
  52. }
  53. if minSize <= c.onDiskCacheSizeLimit0 {
  54. data = c.diskCaches[0].getChunk(fid.Key)
  55. if len(data) >= int(minSize) {
  56. return data
  57. }
  58. }
  59. if minSize <= c.onDiskCacheSizeLimit1 {
  60. data = c.diskCaches[1].getChunk(fid.Key)
  61. if len(data) >= int(minSize) {
  62. return data
  63. }
  64. }
  65. {
  66. data = c.diskCaches[2].getChunk(fid.Key)
  67. if len(data) >= int(minSize) {
  68. return data
  69. }
  70. }
  71. return nil
  72. }
  73. func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
  74. if c == nil {
  75. return
  76. }
  77. c.Lock()
  78. defer c.Unlock()
  79. glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
  80. c.doSetChunk(fileId, data)
  81. }
  82. func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
  83. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  84. c.memCache.SetChunk(fileId, data)
  85. }
  86. fid, err := needle.ParseFileIdFromString(fileId)
  87. if err != nil {
  88. glog.Errorf("failed to parse file id %s", fileId)
  89. return
  90. }
  91. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  92. c.diskCaches[0].setChunk(fid.Key, data)
  93. } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
  94. c.diskCaches[1].setChunk(fid.Key, data)
  95. } else {
  96. c.diskCaches[2].setChunk(fid.Key, data)
  97. }
  98. }
  99. func (c *TieredChunkCache) Shutdown() {
  100. if c == nil {
  101. return
  102. }
  103. c.Lock()
  104. defer c.Unlock()
  105. for _, diskCache := range c.diskCaches {
  106. diskCache.shutdown()
  107. }
  108. }