reader_cache.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package filer
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  9. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  10. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  11. )
  12. type ReaderCache struct {
  13. chunkCache chunk_cache.ChunkCache
  14. lookupFileIdFn wdclient.LookupFileIdFunctionType
  15. sync.Mutex
  16. downloaders map[string]*SingleChunkCacher
  17. limit int
  18. }
  19. type SingleChunkCacher struct {
  20. completedTimeNew int64
  21. sync.Mutex
  22. parent *ReaderCache
  23. chunkFileId string
  24. data []byte
  25. err error
  26. cipherKey []byte
  27. isGzipped bool
  28. chunkSize int
  29. shouldCache bool
  30. wg sync.WaitGroup
  31. cacheStartedCh chan struct{}
  32. }
  33. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  34. return &ReaderCache{
  35. limit: limit,
  36. chunkCache: chunkCache,
  37. lookupFileIdFn: lookupFileIdFn,
  38. downloaders: make(map[string]*SingleChunkCacher),
  39. }
  40. }
  41. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  42. if rc.lookupFileIdFn == nil {
  43. return
  44. }
  45. rc.Lock()
  46. defer rc.Unlock()
  47. if len(rc.downloaders) >= rc.limit {
  48. return
  49. }
  50. for x := chunkViews; x != nil; x = x.Next {
  51. chunkView := x.Value
  52. if _, found := rc.downloaders[chunkView.FileId]; found {
  53. continue
  54. }
  55. if len(rc.downloaders) >= rc.limit {
  56. // abort when slots are filled
  57. return
  58. }
  59. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  60. // cache this chunk if not yet
  61. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  62. go cacher.startCaching()
  63. <-cacher.cacheStartedCh
  64. rc.downloaders[chunkView.FileId] = cacher
  65. }
  66. return
  67. }
  68. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  69. rc.Lock()
  70. if cacher, found := rc.downloaders[fileId]; found {
  71. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  72. rc.Unlock()
  73. return n, err
  74. }
  75. }
  76. if shouldCache || rc.lookupFileIdFn == nil {
  77. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  78. if n > 0 {
  79. rc.Unlock()
  80. return n, err
  81. }
  82. }
  83. // clean up old downloaders
  84. if len(rc.downloaders) >= rc.limit {
  85. oldestFid, oldestTime := "", time.Now().UnixNano()
  86. for fid, downloader := range rc.downloaders {
  87. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  88. if completedTime > 0 && completedTime < oldestTime {
  89. oldestFid, oldestTime = fid, completedTime
  90. }
  91. }
  92. if oldestFid != "" {
  93. oldDownloader := rc.downloaders[oldestFid]
  94. delete(rc.downloaders, oldestFid)
  95. oldDownloader.destroy()
  96. }
  97. }
  98. // glog.V(4).Infof("cache1 %s", fileId)
  99. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  100. go cacher.startCaching()
  101. <-cacher.cacheStartedCh
  102. rc.downloaders[fileId] = cacher
  103. rc.Unlock()
  104. return cacher.readChunkAt(buffer, offset)
  105. }
  106. func (rc *ReaderCache) UnCache(fileId string) {
  107. rc.Lock()
  108. defer rc.Unlock()
  109. // glog.V(4).Infof("uncache %s", fileId)
  110. if downloader, found := rc.downloaders[fileId]; found {
  111. downloader.destroy()
  112. delete(rc.downloaders, fileId)
  113. }
  114. }
  115. func (rc *ReaderCache) destroy() {
  116. rc.Lock()
  117. defer rc.Unlock()
  118. for _, downloader := range rc.downloaders {
  119. downloader.destroy()
  120. }
  121. }
  122. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  123. return &SingleChunkCacher{
  124. parent: parent,
  125. chunkFileId: fileId,
  126. cipherKey: cipherKey,
  127. isGzipped: isGzipped,
  128. chunkSize: chunkSize,
  129. shouldCache: shouldCache,
  130. cacheStartedCh: make(chan struct{}),
  131. }
  132. }
  133. func (s *SingleChunkCacher) startCaching() {
  134. s.wg.Add(1)
  135. defer s.wg.Done()
  136. s.Lock()
  137. defer s.Unlock()
  138. s.cacheStartedCh <- struct{}{} // means this has been started
  139. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  140. if err != nil {
  141. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  142. return
  143. }
  144. s.data = mem.Allocate(s.chunkSize)
  145. _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  146. if s.err != nil {
  147. mem.Free(s.data)
  148. s.data = nil
  149. return
  150. }
  151. if s.shouldCache {
  152. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  153. }
  154. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  155. return
  156. }
  157. func (s *SingleChunkCacher) destroy() {
  158. // wait for all reads to finish before destroying the data
  159. s.wg.Wait()
  160. s.Lock()
  161. defer s.Unlock()
  162. if s.data != nil {
  163. mem.Free(s.data)
  164. s.data = nil
  165. close(s.cacheStartedCh)
  166. }
  167. }
  168. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  169. s.wg.Add(1)
  170. defer s.wg.Done()
  171. s.Lock()
  172. defer s.Unlock()
  173. if s.err != nil {
  174. return 0, s.err
  175. }
  176. if len(s.data) <= int(offset) {
  177. return 0, nil
  178. }
  179. return copy(buf, s.data[offset:]), nil
  180. }