on_disk_cache_layer.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package chunk_cache
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/storage"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  7. "path"
  8. "slices"
  9. )
  10. type OnDiskCacheLayer struct {
  11. diskCaches []*ChunkCacheVolume
  12. }
  13. func NewOnDiskCacheLayer(dir, namePrefix string, diskSize int64, segmentCount int) *OnDiskCacheLayer {
  14. volumeCount, volumeSize := int(diskSize/(30000*1024*1024)), int64(30000*1024*1024)
  15. if volumeCount < segmentCount {
  16. volumeCount, volumeSize = segmentCount, diskSize/int64(segmentCount)
  17. }
  18. c := &OnDiskCacheLayer{}
  19. for i := 0; i < volumeCount; i++ {
  20. fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i))
  21. diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize)
  22. if err != nil {
  23. glog.Errorf("failed to add cache %s : %v", fileName, err)
  24. } else {
  25. c.diskCaches = append(c.diskCaches, diskCache)
  26. }
  27. }
  28. // keep newest cache to the front
  29. slices.SortFunc(c.diskCaches, func(a, b *ChunkCacheVolume) int {
  30. return b.lastModTime.Compare(a.lastModTime)
  31. })
  32. return c
  33. }
  34. func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) {
  35. if len(c.diskCaches) == 0 {
  36. return
  37. }
  38. if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
  39. t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
  40. if resetErr != nil {
  41. glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
  42. return
  43. }
  44. for i := len(c.diskCaches) - 1; i > 0; i-- {
  45. c.diskCaches[i] = c.diskCaches[i-1]
  46. }
  47. c.diskCaches[0] = t
  48. }
  49. if err := c.diskCaches[0].WriteNeedle(needleId, data); err != nil {
  50. glog.V(0).Infof("cache write %v size %d: %v", needleId, len(data), err)
  51. }
  52. }
  53. func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) {
  54. var err error
  55. for _, diskCache := range c.diskCaches {
  56. data, err = diskCache.GetNeedle(needleId)
  57. if err == storage.ErrorNotFound {
  58. continue
  59. }
  60. if err != nil {
  61. glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId)
  62. continue
  63. }
  64. if len(data) != 0 {
  65. return
  66. }
  67. }
  68. return nil
  69. }
  70. func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length uint64) (data []byte) {
  71. var err error
  72. for _, diskCache := range c.diskCaches {
  73. data, err = diskCache.getNeedleSlice(needleId, offset, length)
  74. if err == storage.ErrorNotFound {
  75. continue
  76. }
  77. if err != nil {
  78. glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err)
  79. continue
  80. }
  81. if len(data) != 0 {
  82. return
  83. }
  84. }
  85. return nil
  86. }
  87. func (c *OnDiskCacheLayer) readChunkAt(buffer []byte, needleId types.NeedleId, offset uint64) (n int, err error) {
  88. for _, diskCache := range c.diskCaches {
  89. n, err = diskCache.readNeedleSliceAt(buffer, needleId, offset)
  90. if err == storage.ErrorNotFound {
  91. continue
  92. }
  93. if err != nil {
  94. glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err)
  95. continue
  96. }
  97. if n > 0 {
  98. return
  99. }
  100. }
  101. return
  102. }
  103. func (c *OnDiskCacheLayer) shutdown() {
  104. for _, diskCache := range c.diskCaches {
  105. diskCache.Shutdown()
  106. }
  107. }