reader_cache.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package filer
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  5. "github.com/chrislusf/seaweedfs/weed/util/mem"
  6. "github.com/chrislusf/seaweedfs/weed/wdclient"
  7. "sync"
  8. "time"
  9. )
  10. type ReaderCache struct {
  11. chunkCache chunk_cache.ChunkCache
  12. lookupFileIdFn wdclient.LookupFileIdFunctionType
  13. sync.Mutex
  14. downloaders map[string]*SingleChunkCacher
  15. limit int
  16. }
  17. type SingleChunkCacher struct {
  18. sync.RWMutex
  19. parent *ReaderCache
  20. chunkFileId string
  21. data []byte
  22. err error
  23. cipherKey []byte
  24. isGzipped bool
  25. chunkSize int
  26. shouldCache bool
  27. wg sync.WaitGroup
  28. completedTime time.Time
  29. }
  30. func newReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  31. return &ReaderCache{
  32. limit: limit,
  33. chunkCache: chunkCache,
  34. lookupFileIdFn: lookupFileIdFn,
  35. downloaders: make(map[string]*SingleChunkCacher),
  36. }
  37. }
  38. func (rc *ReaderCache) MaybeCache(chunkViews []*ChunkView) {
  39. if rc.lookupFileIdFn == nil {
  40. return
  41. }
  42. rc.Lock()
  43. defer rc.Unlock()
  44. for _, chunkView := range chunkViews {
  45. if _, found := rc.downloaders[chunkView.FileId]; found {
  46. continue
  47. }
  48. if len(rc.downloaders) >= rc.limit {
  49. // if still no slots, return
  50. return
  51. }
  52. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.LogicOffset)
  53. // cache this chunk if not yet
  54. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), false)
  55. cacher.wg.Add(1)
  56. go cacher.startCaching()
  57. cacher.wg.Wait()
  58. rc.downloaders[chunkView.FileId] = cacher
  59. }
  60. return
  61. }
  62. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  63. rc.Lock()
  64. defer rc.Unlock()
  65. if cacher, found := rc.downloaders[fileId]; found {
  66. return cacher.readChunkAt(buffer, offset)
  67. }
  68. if shouldCache || rc.lookupFileIdFn == nil {
  69. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  70. if n > 0 {
  71. return n, err
  72. }
  73. }
  74. if len(rc.downloaders) >= rc.limit {
  75. oldestFid, oldestTime := "", time.Now()
  76. for fid, downloader := range rc.downloaders {
  77. if !downloader.completedTime.IsZero() {
  78. if downloader.completedTime.Before(oldestTime) {
  79. oldestFid, oldestTime = fid, downloader.completedTime
  80. }
  81. }
  82. }
  83. if oldestFid != "" {
  84. oldDownloader := rc.downloaders[oldestFid]
  85. delete(rc.downloaders, oldestFid)
  86. oldDownloader.destroy()
  87. }
  88. }
  89. // glog.V(4).Infof("cache1 %s", fileId)
  90. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  91. cacher.wg.Add(1)
  92. go cacher.startCaching()
  93. cacher.wg.Wait()
  94. rc.downloaders[fileId] = cacher
  95. return cacher.readChunkAt(buffer, offset)
  96. }
  97. func (rc *ReaderCache) UnCache(fileId string) {
  98. rc.Lock()
  99. defer rc.Unlock()
  100. // glog.V(4).Infof("uncache %s", fileId)
  101. if downloader, found := rc.downloaders[fileId]; found {
  102. downloader.destroy()
  103. delete(rc.downloaders, fileId)
  104. }
  105. }
  106. func (rc *ReaderCache) destroy() {
  107. rc.Lock()
  108. defer rc.Unlock()
  109. for _, downloader := range rc.downloaders {
  110. downloader.destroy()
  111. }
  112. }
  113. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  114. t := &SingleChunkCacher{
  115. parent: parent,
  116. chunkFileId: fileId,
  117. cipherKey: cipherKey,
  118. isGzipped: isGzipped,
  119. chunkSize: chunkSize,
  120. shouldCache: shouldCache,
  121. }
  122. return t
  123. }
  124. func (s *SingleChunkCacher) startCaching() {
  125. s.Lock()
  126. defer s.Unlock()
  127. s.wg.Done() // means this has been started
  128. urlStrings, err := s.parent.lookupFileIdFn(s.chunkFileId)
  129. if err != nil {
  130. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  131. return
  132. }
  133. s.data = mem.Allocate(s.chunkSize)
  134. _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
  135. if s.err != nil {
  136. mem.Free(s.data)
  137. s.data = nil
  138. return
  139. }
  140. s.completedTime = time.Now()
  141. if s.shouldCache {
  142. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  143. }
  144. return
  145. }
  146. func (s *SingleChunkCacher) destroy() {
  147. if s.data != nil {
  148. mem.Free(s.data)
  149. s.data = nil
  150. }
  151. }
  152. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  153. s.RLock()
  154. defer s.RUnlock()
  155. return copy(buf, s.data[offset:]), s.err
  156. }