chunk_cache.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. package chunk_cache
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. )
  8. var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
  9. type ChunkCache interface {
  10. GetChunk(fileId string, minSize uint64) (data []byte)
  11. GetChunkSlice(fileId string, offset, length uint64) []byte
  12. SetChunk(fileId string, data []byte)
  13. }
  14. // a global cache for recently accessed file chunks
  15. type TieredChunkCache struct {
  16. memCache *ChunkCacheInMemory
  17. diskCaches []*OnDiskCacheLayer
  18. sync.RWMutex
  19. onDiskCacheSizeLimit0 uint64
  20. onDiskCacheSizeLimit1 uint64
  21. onDiskCacheSizeLimit2 uint64
  22. }
  23. var _ ChunkCache = &TieredChunkCache{}
  24. func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
  25. c := &TieredChunkCache{
  26. memCache: NewChunkCacheInMemory(maxEntries),
  27. }
  28. c.diskCaches = make([]*OnDiskCacheLayer, 3)
  29. c.onDiskCacheSizeLimit0 = uint64(unitSize)
  30. c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
  31. c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
  32. c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
  33. c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
  34. c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
  35. return c
  36. }
  37. func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
  38. if c == nil {
  39. return
  40. }
  41. c.RLock()
  42. defer c.RUnlock()
  43. return c.doGetChunk(fileId, minSize)
  44. }
  45. func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) {
  46. if minSize <= c.onDiskCacheSizeLimit0 {
  47. data = c.memCache.GetChunk(fileId)
  48. if len(data) >= int(minSize) {
  49. return data
  50. }
  51. }
  52. fid, err := needle.ParseFileIdFromString(fileId)
  53. if err != nil {
  54. glog.Errorf("failed to parse file id %s", fileId)
  55. return nil
  56. }
  57. if minSize <= c.onDiskCacheSizeLimit0 {
  58. data = c.diskCaches[0].getChunk(fid.Key)
  59. if len(data) >= int(minSize) {
  60. return data
  61. }
  62. }
  63. if minSize <= c.onDiskCacheSizeLimit1 {
  64. data = c.diskCaches[1].getChunk(fid.Key)
  65. if len(data) >= int(minSize) {
  66. return data
  67. }
  68. }
  69. {
  70. data = c.diskCaches[2].getChunk(fid.Key)
  71. if len(data) >= int(minSize) {
  72. return data
  73. }
  74. }
  75. return nil
  76. }
  77. func (c *TieredChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
  78. if c == nil {
  79. return nil
  80. }
  81. c.RLock()
  82. defer c.RUnlock()
  83. return c.doGetChunkSlice(fileId, offset, length)
  84. }
  85. func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64) (data []byte) {
  86. minSize := offset + length
  87. if minSize <= c.onDiskCacheSizeLimit0 {
  88. data, err := c.memCache.getChunkSlice(fileId, offset, length)
  89. if err != nil {
  90. glog.Errorf("failed to read from memcache: %s", err)
  91. }
  92. if len(data) >= int(minSize) {
  93. return data
  94. }
  95. }
  96. fid, err := needle.ParseFileIdFromString(fileId)
  97. if err != nil {
  98. glog.Errorf("failed to parse file id %s", fileId)
  99. return nil
  100. }
  101. if minSize <= c.onDiskCacheSizeLimit0 {
  102. data = c.diskCaches[0].getChunkSlice(fid.Key, offset, length)
  103. if len(data) >= int(minSize) {
  104. return data
  105. }
  106. }
  107. if minSize <= c.onDiskCacheSizeLimit1 {
  108. data = c.diskCaches[1].getChunkSlice(fid.Key, offset, length)
  109. if len(data) >= int(minSize) {
  110. return data
  111. }
  112. }
  113. {
  114. data = c.diskCaches[2].getChunkSlice(fid.Key, offset, length)
  115. if len(data) >= int(minSize) {
  116. return data
  117. }
  118. }
  119. return nil
  120. }
  121. func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
  122. if c == nil {
  123. return
  124. }
  125. c.Lock()
  126. defer c.Unlock()
  127. glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
  128. c.doSetChunk(fileId, data)
  129. }
  130. func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
  131. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  132. c.memCache.SetChunk(fileId, data)
  133. }
  134. fid, err := needle.ParseFileIdFromString(fileId)
  135. if err != nil {
  136. glog.Errorf("failed to parse file id %s", fileId)
  137. return
  138. }
  139. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  140. c.diskCaches[0].setChunk(fid.Key, data)
  141. } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
  142. c.diskCaches[1].setChunk(fid.Key, data)
  143. } else {
  144. c.diskCaches[2].setChunk(fid.Key, data)
  145. }
  146. }
  147. func (c *TieredChunkCache) Shutdown() {
  148. if c == nil {
  149. return
  150. }
  151. c.Lock()
  152. defer c.Unlock()
  153. for _, diskCache := range c.diskCaches {
  154. diskCache.shutdown()
  155. }
  156. }
  157. func min(x, y int) int {
  158. if x < y {
  159. return x
  160. }
  161. return y
  162. }