reader_cache.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package filer
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  8. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  9. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  10. )
  11. type ReaderCache struct {
  12. chunkCache chunk_cache.ChunkCache
  13. lookupFileIdFn wdclient.LookupFileIdFunctionType
  14. sync.Mutex
  15. downloaders map[string]*SingleChunkCacher
  16. limit int
  17. }
  18. type SingleChunkCacher struct {
  19. sync.Mutex
  20. parent *ReaderCache
  21. chunkFileId string
  22. data []byte
  23. err error
  24. cipherKey []byte
  25. isGzipped bool
  26. chunkSize int
  27. shouldCache bool
  28. wg sync.WaitGroup
  29. cacheStartedCh chan struct{}
  30. completedTimeNew int64
  31. }
  32. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  33. return &ReaderCache{
  34. limit: limit,
  35. chunkCache: chunkCache,
  36. lookupFileIdFn: lookupFileIdFn,
  37. downloaders: make(map[string]*SingleChunkCacher),
  38. }
  39. }
  40. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  41. if rc.lookupFileIdFn == nil {
  42. return
  43. }
  44. rc.Lock()
  45. defer rc.Unlock()
  46. if len(rc.downloaders) >= rc.limit {
  47. return
  48. }
  49. for x := chunkViews; x != nil; x = x.Next {
  50. chunkView := x.Value
  51. if _, found := rc.downloaders[chunkView.FileId]; found {
  52. continue
  53. }
  54. if len(rc.downloaders) >= rc.limit {
  55. // abort when slots are filled
  56. return
  57. }
  58. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  59. // cache this chunk if not yet
  60. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  61. go cacher.startCaching()
  62. <-cacher.cacheStartedCh
  63. rc.downloaders[chunkView.FileId] = cacher
  64. }
  65. return
  66. }
  67. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  68. rc.Lock()
  69. if cacher, found := rc.downloaders[fileId]; found {
  70. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  71. rc.Unlock()
  72. return n, err
  73. }
  74. }
  75. if shouldCache || rc.lookupFileIdFn == nil {
  76. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  77. if n > 0 {
  78. rc.Unlock()
  79. return n, err
  80. }
  81. }
  82. // clean up old downloaders
  83. if len(rc.downloaders) >= rc.limit {
  84. oldestFid, oldestTime := "", time.Now().UnixNano()
  85. for fid, downloader := range rc.downloaders {
  86. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  87. if completedTime > 0 && completedTime < oldestTime {
  88. oldestFid, oldestTime = fid, completedTime
  89. }
  90. }
  91. if oldestFid != "" {
  92. oldDownloader := rc.downloaders[oldestFid]
  93. delete(rc.downloaders, oldestFid)
  94. oldDownloader.destroy()
  95. }
  96. }
  97. // glog.V(4).Infof("cache1 %s", fileId)
  98. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  99. go cacher.startCaching()
  100. <-cacher.cacheStartedCh
  101. rc.downloaders[fileId] = cacher
  102. rc.Unlock()
  103. return cacher.readChunkAt(buffer, offset)
  104. }
  105. func (rc *ReaderCache) UnCache(fileId string) {
  106. rc.Lock()
  107. defer rc.Unlock()
  108. // glog.V(4).Infof("uncache %s", fileId)
  109. if downloader, found := rc.downloaders[fileId]; found {
  110. downloader.destroy()
  111. delete(rc.downloaders, fileId)
  112. }
  113. }
  114. func (rc *ReaderCache) destroy() {
  115. rc.Lock()
  116. defer rc.Unlock()
  117. for _, downloader := range rc.downloaders {
  118. downloader.destroy()
  119. }
  120. }
  121. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  122. return &SingleChunkCacher{
  123. parent: parent,
  124. chunkFileId: fileId,
  125. cipherKey: cipherKey,
  126. isGzipped: isGzipped,
  127. chunkSize: chunkSize,
  128. shouldCache: shouldCache,
  129. cacheStartedCh: make(chan struct{}),
  130. }
  131. }
  132. func (s *SingleChunkCacher) startCaching() {
  133. s.wg.Add(1)
  134. defer s.wg.Done()
  135. s.Lock()
  136. defer s.Unlock()
  137. s.cacheStartedCh <- struct{}{} // means this has been started
  138. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  139. if err != nil {
  140. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  141. return
  142. }
  143. s.data = mem.Allocate(s.chunkSize)
  144. _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  145. if s.err != nil {
  146. mem.Free(s.data)
  147. s.data = nil
  148. return
  149. }
  150. if s.shouldCache {
  151. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  152. }
  153. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  154. return
  155. }
  156. func (s *SingleChunkCacher) destroy() {
  157. // wait for all reads to finish before destroying the data
  158. s.wg.Wait()
  159. s.Lock()
  160. defer s.Unlock()
  161. if s.data != nil {
  162. mem.Free(s.data)
  163. s.data = nil
  164. close(s.cacheStartedCh)
  165. }
  166. }
  167. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  168. s.wg.Add(1)
  169. defer s.wg.Done()
  170. s.Lock()
  171. defer s.Unlock()
  172. if s.err != nil {
  173. return 0, s.err
  174. }
  175. if len(s.data) <= int(offset) {
  176. return 0, nil
  177. }
  178. return copy(buf, s.data[offset:]), nil
  179. }