chunk_cache.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package chunk_cache
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  7. )
  8. var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
  9. type ChunkCache interface {
  10. ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
  11. SetChunk(fileId string, data []byte)
  12. IsInCache(fileId string, lockNeeded bool) (answer bool)
  13. GetMaxFilePartSizeInCache() (answer uint64)
  14. }
  15. // a global cache for recently accessed file chunks
  16. type TieredChunkCache struct {
  17. memCache *ChunkCacheInMemory
  18. diskCaches []*OnDiskCacheLayer
  19. sync.RWMutex
  20. onDiskCacheSizeLimit0 uint64
  21. onDiskCacheSizeLimit1 uint64
  22. onDiskCacheSizeLimit2 uint64
  23. maxFilePartSizeInCache uint64
  24. }
  25. var _ ChunkCache = &TieredChunkCache{}
  26. func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
  27. c := &TieredChunkCache{
  28. memCache: NewChunkCacheInMemory(maxEntries),
  29. }
  30. c.diskCaches = make([]*OnDiskCacheLayer, 3)
  31. c.onDiskCacheSizeLimit0 = uint64(unitSize)
  32. c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
  33. c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
  34. c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
  35. c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
  36. c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
  37. c.maxFilePartSizeInCache = uint64(unitSize*diskSizeInUnit) / 4
  38. return c
  39. }
  40. func (c *TieredChunkCache) GetMaxFilePartSizeInCache() (answer uint64) {
  41. if c == nil {
  42. return 0
  43. }
  44. return c.maxFilePartSizeInCache
  45. }
  46. func (c *TieredChunkCache) IsInCache(fileId string, lockNeeded bool) (answer bool) {
  47. if c == nil {
  48. return false
  49. }
  50. if lockNeeded {
  51. c.RLock()
  52. defer c.RUnlock()
  53. }
  54. item := c.memCache.cache.Get(fileId)
  55. if item != nil {
  56. glog.V(4).Infof("fileId %s is in memcache", fileId)
  57. return true
  58. }
  59. fid, err := needle.ParseFileIdFromString(fileId)
  60. if err != nil {
  61. glog.V(4).Infof("failed to parse file id %s", fileId)
  62. return false
  63. }
  64. for i, diskCacheLayer := range c.diskCaches {
  65. for k, v := range diskCacheLayer.diskCaches {
  66. _, ok := v.nm.Get(fid.Key)
  67. if ok {
  68. glog.V(4).Infof("fileId %s is in diskCaches[%d].volume[%d]", fileId, i, k)
  69. return true
  70. }
  71. }
  72. }
  73. return false
  74. }
  75. func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
  76. if c == nil {
  77. return 0, nil
  78. }
  79. c.RLock()
  80. defer c.RUnlock()
  81. minSize := offset + uint64(len(data))
  82. if minSize <= c.onDiskCacheSizeLimit0 {
  83. n, err = c.memCache.readChunkAt(data, fileId, offset)
  84. if err != nil {
  85. glog.Errorf("failed to read from memcache: %s", err)
  86. }
  87. if n == int(len(data)) {
  88. return n, nil
  89. }
  90. }
  91. fid, err := needle.ParseFileIdFromString(fileId)
  92. if err != nil {
  93. glog.Errorf("failed to parse file id %s", fileId)
  94. return 0, nil
  95. }
  96. if minSize <= c.onDiskCacheSizeLimit0 {
  97. n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
  98. if n == int(len(data)) {
  99. return
  100. }
  101. }
  102. if minSize <= c.onDiskCacheSizeLimit1 {
  103. n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
  104. if n == int(len(data)) {
  105. return
  106. }
  107. }
  108. {
  109. n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
  110. if n == int(len(data)) {
  111. return
  112. }
  113. }
  114. return 0, nil
  115. }
  116. func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
  117. if c == nil {
  118. return
  119. }
  120. c.Lock()
  121. defer c.Unlock()
  122. glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
  123. if c.IsInCache(fileId, false) {
  124. glog.V(4).Infof("fileId %s is already in cache", fileId)
  125. return
  126. }
  127. c.doSetChunk(fileId, data)
  128. }
  129. func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
  130. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  131. c.memCache.SetChunk(fileId, data)
  132. }
  133. fid, err := needle.ParseFileIdFromString(fileId)
  134. if err != nil {
  135. glog.Errorf("failed to parse file id %s", fileId)
  136. return
  137. }
  138. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  139. c.diskCaches[0].setChunk(fid.Key, data)
  140. } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
  141. c.diskCaches[1].setChunk(fid.Key, data)
  142. } else {
  143. c.diskCaches[2].setChunk(fid.Key, data)
  144. }
  145. }
  146. func (c *TieredChunkCache) Shutdown() {
  147. if c == nil {
  148. return
  149. }
  150. c.Lock()
  151. defer c.Unlock()
  152. for _, diskCache := range c.diskCaches {
  153. diskCache.shutdown()
  154. }
  155. }
  156. func min(x, y int) int {
  157. if x < y {
  158. return x
  159. }
  160. return y
  161. }