reader_cache.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package filer
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  9. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  10. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  11. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  12. )
  13. type ReaderCache struct {
  14. chunkCache chunk_cache.ChunkCache
  15. lookupFileIdFn wdclient.LookupFileIdFunctionType
  16. sync.Mutex
  17. downloaders map[string]*SingleChunkCacher
  18. limit int
  19. }
  20. type SingleChunkCacher struct {
  21. completedTimeNew int64
  22. sync.Mutex
  23. parent *ReaderCache
  24. chunkFileId string
  25. data []byte
  26. err error
  27. cipherKey []byte
  28. isGzipped bool
  29. chunkSize int
  30. shouldCache bool
  31. wg sync.WaitGroup
  32. cacheStartedCh chan struct{}
  33. }
  34. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  35. return &ReaderCache{
  36. limit: limit,
  37. chunkCache: chunkCache,
  38. lookupFileIdFn: lookupFileIdFn,
  39. downloaders: make(map[string]*SingleChunkCacher),
  40. }
  41. }
  42. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  43. if rc.lookupFileIdFn == nil {
  44. return
  45. }
  46. rc.Lock()
  47. defer rc.Unlock()
  48. if len(rc.downloaders) >= rc.limit {
  49. return
  50. }
  51. for x := chunkViews; x != nil; x = x.Next {
  52. chunkView := x.Value
  53. if _, found := rc.downloaders[chunkView.FileId]; found {
  54. continue
  55. }
  56. if rc.chunkCache.IsInCache(chunkView.FileId, true) {
  57. glog.V(4).Infof("%s is in cache", chunkView.FileId)
  58. continue
  59. }
  60. if len(rc.downloaders) >= rc.limit {
  61. // abort when slots are filled
  62. return
  63. }
  64. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  65. // cache this chunk if not yet
  66. shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= rc.chunkCache.GetMaxFilePartSizeInCache()
  67. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), shouldCache)
  68. go cacher.startCaching()
  69. <-cacher.cacheStartedCh
  70. rc.downloaders[chunkView.FileId] = cacher
  71. }
  72. return
  73. }
  74. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  75. rc.Lock()
  76. if cacher, found := rc.downloaders[fileId]; found {
  77. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  78. rc.Unlock()
  79. return n, err
  80. }
  81. }
  82. if shouldCache || rc.lookupFileIdFn == nil {
  83. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  84. if n > 0 {
  85. rc.Unlock()
  86. return n, err
  87. }
  88. }
  89. // clean up old downloaders
  90. if len(rc.downloaders) >= rc.limit {
  91. oldestFid, oldestTime := "", time.Now().UnixNano()
  92. for fid, downloader := range rc.downloaders {
  93. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  94. if completedTime > 0 && completedTime < oldestTime {
  95. oldestFid, oldestTime = fid, completedTime
  96. }
  97. }
  98. if oldestFid != "" {
  99. oldDownloader := rc.downloaders[oldestFid]
  100. delete(rc.downloaders, oldestFid)
  101. oldDownloader.destroy()
  102. }
  103. }
  104. // glog.V(4).Infof("cache1 %s", fileId)
  105. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  106. go cacher.startCaching()
  107. <-cacher.cacheStartedCh
  108. rc.downloaders[fileId] = cacher
  109. rc.Unlock()
  110. return cacher.readChunkAt(buffer, offset)
  111. }
  112. func (rc *ReaderCache) UnCache(fileId string) {
  113. rc.Lock()
  114. defer rc.Unlock()
  115. // glog.V(4).Infof("uncache %s", fileId)
  116. if downloader, found := rc.downloaders[fileId]; found {
  117. downloader.destroy()
  118. delete(rc.downloaders, fileId)
  119. }
  120. }
  121. func (rc *ReaderCache) destroy() {
  122. rc.Lock()
  123. defer rc.Unlock()
  124. for _, downloader := range rc.downloaders {
  125. downloader.destroy()
  126. }
  127. }
  128. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  129. return &SingleChunkCacher{
  130. parent: parent,
  131. chunkFileId: fileId,
  132. cipherKey: cipherKey,
  133. isGzipped: isGzipped,
  134. chunkSize: chunkSize,
  135. shouldCache: shouldCache,
  136. cacheStartedCh: make(chan struct{}),
  137. }
  138. }
  139. func (s *SingleChunkCacher) startCaching() {
  140. s.wg.Add(1)
  141. defer s.wg.Done()
  142. s.Lock()
  143. defer s.Unlock()
  144. s.cacheStartedCh <- struct{}{} // means this has been started
  145. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  146. if err != nil {
  147. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  148. return
  149. }
  150. s.data = mem.Allocate(s.chunkSize)
  151. _, s.err = util_http.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  152. if s.err != nil {
  153. mem.Free(s.data)
  154. s.data = nil
  155. return
  156. }
  157. if s.shouldCache {
  158. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  159. }
  160. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  161. return
  162. }
  163. func (s *SingleChunkCacher) destroy() {
  164. // wait for all reads to finish before destroying the data
  165. s.wg.Wait()
  166. s.Lock()
  167. defer s.Unlock()
  168. if s.data != nil {
  169. mem.Free(s.data)
  170. s.data = nil
  171. close(s.cacheStartedCh)
  172. }
  173. }
  174. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  175. s.wg.Add(1)
  176. defer s.wg.Done()
  177. s.Lock()
  178. defer s.Unlock()
  179. if s.err != nil {
  180. return 0, s.err
  181. }
  182. if len(s.data) <= int(offset) {
  183. return 0, nil
  184. }
  185. return copy(buf, s.data[offset:]), nil
  186. }