stream.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sort"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. func HasData(entry *filer_pb.Entry) bool {
  18. if len(entry.Content) > 0 {
  19. return true
  20. }
  21. return len(entry.Chunks) > 0
  22. }
  23. func IsSameData(a, b *filer_pb.Entry) bool {
  24. if len(a.Content) > 0 || len(b.Content) > 0 {
  25. return bytes.Equal(a.Content, b.Content)
  26. }
  27. return isSameChunks(a.Chunks, b.Chunks)
  28. }
  29. func isSameChunks(a, b []*filer_pb.FileChunk) bool {
  30. if len(a) != len(b) {
  31. return false
  32. }
  33. sort.Slice(a, func(i, j int) bool {
  34. return strings.Compare(a[i].ETag, a[j].ETag) < 0
  35. })
  36. sort.Slice(b, func(i, j int) bool {
  37. return strings.Compare(b[i].ETag, b[j].ETag) < 0
  38. })
  39. for i := 0; i < len(a); i++ {
  40. if a[i].ETag != b[i].ETag {
  41. return false
  42. }
  43. }
  44. return true
  45. }
  46. func NewFileReader(filerClient filer_pb.FilerClient, entry *filer_pb.Entry) io.Reader {
  47. if len(entry.Content) > 0 {
  48. return bytes.NewReader(entry.Content)
  49. }
  50. return NewChunkStreamReader(filerClient, entry.Chunks)
  51. }
  52. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  53. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  54. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  55. fileId2Url := make(map[string][]string)
  56. for _, chunkView := range chunkViews {
  57. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  58. if err != nil {
  59. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  60. return err
  61. } else if len(urlStrings) == 0 {
  62. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  63. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  64. }
  65. fileId2Url[chunkView.FileId] = urlStrings
  66. }
  67. remaining := size
  68. for _, chunkView := range chunkViews {
  69. if offset < chunkView.LogicOffset {
  70. gap := chunkView.LogicOffset - offset
  71. remaining -= gap
  72. glog.V(4).Infof("zero [%d,%d)", offset, chunkView.LogicOffset)
  73. err := writeZero(writer, gap)
  74. if err != nil {
  75. return fmt.Errorf("write zero [%d,%d)", offset, chunkView.LogicOffset)
  76. }
  77. offset = chunkView.LogicOffset
  78. }
  79. urlStrings := fileId2Url[chunkView.FileId]
  80. start := time.Now()
  81. err := retriedStreamFetchChunkData(writer, urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  82. offset += int64(chunkView.Size)
  83. remaining -= int64(chunkView.Size)
  84. stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds())
  85. if err != nil {
  86. stats.FilerRequestCounter.WithLabelValues("chunkDownloadError").Inc()
  87. return fmt.Errorf("read chunk: %v", err)
  88. }
  89. stats.FilerRequestCounter.WithLabelValues("chunkDownload").Inc()
  90. }
  91. glog.V(4).Infof("zero [%d,%d)", offset, offset+remaining)
  92. err := writeZero(writer, remaining)
  93. if err != nil {
  94. return fmt.Errorf("write zero [%d,%d)", offset, offset+remaining)
  95. }
  96. return nil
  97. }
  98. // ---------------- ReadAllReader ----------------------------------
  99. func writeZero(w io.Writer, size int64) (err error) {
  100. zeroPadding := make([]byte, 1024)
  101. var written int
  102. for size > 0 {
  103. if size > 1024 {
  104. written, err = w.Write(zeroPadding)
  105. } else {
  106. written, err = w.Write(zeroPadding[:size])
  107. }
  108. size -= int64(written)
  109. if err != nil {
  110. return
  111. }
  112. }
  113. return
  114. }
  115. func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) error {
  116. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  117. return masterClient.LookupFileId(fileId)
  118. }
  119. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, int64(len(buffer)))
  120. idx := 0
  121. for _, chunkView := range chunkViews {
  122. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  123. if err != nil {
  124. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  125. return err
  126. }
  127. n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.Size)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset)
  128. if err != nil {
  129. return err
  130. }
  131. idx += n
  132. }
  133. return nil
  134. }
  135. // ---------------- ChunkStreamReader ----------------------------------
  136. type ChunkStreamReader struct {
  137. chunkViews []*ChunkView
  138. totalSize int64
  139. logicOffset int64
  140. buffer []byte
  141. bufferOffset int64
  142. bufferLock sync.Mutex
  143. chunk string
  144. lookupFileId wdclient.LookupFileIdFunctionType
  145. }
  146. var _ = io.ReadSeeker(&ChunkStreamReader{})
  147. var _ = io.ReaderAt(&ChunkStreamReader{})
  148. func doNewChunkStreamReader(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  149. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  150. sort.Slice(chunkViews, func(i, j int) bool {
  151. return chunkViews[i].LogicOffset < chunkViews[j].LogicOffset
  152. })
  153. var totalSize int64
  154. for _, chunk := range chunkViews {
  155. totalSize += int64(chunk.Size)
  156. }
  157. return &ChunkStreamReader{
  158. chunkViews: chunkViews,
  159. lookupFileId: lookupFileIdFn,
  160. totalSize: totalSize,
  161. }
  162. }
  163. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  164. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  165. return masterClient.LookupFileId(fileId)
  166. }
  167. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  168. }
  169. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  170. lookupFileIdFn := LookupFn(filerClient)
  171. return doNewChunkStreamReader(lookupFileIdFn, chunks)
  172. }
  173. func (c *ChunkStreamReader) ReadAt(p []byte, off int64) (n int, err error) {
  174. c.bufferLock.Lock()
  175. defer c.bufferLock.Unlock()
  176. if err = c.prepareBufferFor(off); err != nil {
  177. return
  178. }
  179. c.logicOffset = off
  180. return c.doRead(p)
  181. }
  182. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  183. c.bufferLock.Lock()
  184. defer c.bufferLock.Unlock()
  185. return c.doRead(p)
  186. }
  187. func (c *ChunkStreamReader) doRead(p []byte) (n int, err error) {
  188. // fmt.Printf("do read [%d,%d) at %s[%d,%d)\n", c.logicOffset, c.logicOffset+int64(len(p)), c.chunk, c.bufferOffset, c.bufferOffset+int64(len(c.buffer)))
  189. for n < len(p) {
  190. // println("read", c.logicOffset)
  191. if err = c.prepareBufferFor(c.logicOffset); err != nil {
  192. return
  193. }
  194. t := copy(p[n:], c.buffer[c.logicOffset-c.bufferOffset:])
  195. n += t
  196. c.logicOffset += int64(t)
  197. }
  198. return
  199. }
  200. func (c *ChunkStreamReader) isBufferEmpty() bool {
  201. return len(c.buffer) <= int(c.logicOffset-c.bufferOffset)
  202. }
  203. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  204. c.bufferLock.Lock()
  205. defer c.bufferLock.Unlock()
  206. var err error
  207. switch whence {
  208. case io.SeekStart:
  209. case io.SeekCurrent:
  210. offset += c.logicOffset
  211. case io.SeekEnd:
  212. offset = c.totalSize + offset
  213. }
  214. if offset > c.totalSize {
  215. err = io.ErrUnexpectedEOF
  216. } else {
  217. c.logicOffset = offset
  218. }
  219. return offset, err
  220. }
  221. func insideChunk(offset int64, chunk *ChunkView) bool {
  222. return chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size)
  223. }
  224. func (c *ChunkStreamReader) prepareBufferFor(offset int64) (err error) {
  225. // stay in the same chunk
  226. if c.bufferOffset <= offset && offset < c.bufferOffset+int64(len(c.buffer)) {
  227. return nil
  228. }
  229. // fmt.Printf("fetch for offset %d\n", offset)
  230. // need to seek to a different chunk
  231. currentChunkIndex := sort.Search(len(c.chunkViews), func(i int) bool {
  232. return offset < c.chunkViews[i].LogicOffset
  233. })
  234. if currentChunkIndex == len(c.chunkViews) {
  235. // not found
  236. if insideChunk(offset, c.chunkViews[0]) {
  237. // fmt.Printf("select0 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  238. currentChunkIndex = 0
  239. } else if insideChunk(offset, c.chunkViews[len(c.chunkViews)-1]) {
  240. currentChunkIndex = len(c.chunkViews) - 1
  241. // fmt.Printf("select last chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  242. } else {
  243. return io.EOF
  244. }
  245. } else if currentChunkIndex > 0 {
  246. if insideChunk(offset, c.chunkViews[currentChunkIndex]) {
  247. // good hit
  248. } else if insideChunk(offset, c.chunkViews[currentChunkIndex-1]) {
  249. currentChunkIndex -= 1
  250. // fmt.Printf("select -1 chunk %d %s\n", currentChunkIndex, c.chunkViews[currentChunkIndex].FileId)
  251. } else {
  252. // glog.Fatalf("unexpected1 offset %d", offset)
  253. return fmt.Errorf("unexpected1 offset %d", offset)
  254. }
  255. } else {
  256. // glog.Fatalf("unexpected2 offset %d", offset)
  257. return fmt.Errorf("unexpected2 offset %d", offset)
  258. }
  259. // positioning within the new chunk
  260. chunk := c.chunkViews[currentChunkIndex]
  261. if insideChunk(offset, chunk) {
  262. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  263. if err = c.fetchChunkToBuffer(chunk); err != nil {
  264. return
  265. }
  266. }
  267. } else {
  268. // glog.Fatalf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  269. return fmt.Errorf("unexpected3 offset %d in %s [%d,%d)", offset, chunk.FileId, chunk.LogicOffset, chunk.LogicOffset+int64(chunk.Size))
  270. }
  271. return
  272. }
  273. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  274. urlStrings, err := c.lookupFileId(chunkView.FileId)
  275. if err != nil {
  276. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  277. return err
  278. }
  279. var buffer bytes.Buffer
  280. var shouldRetry bool
  281. for _, urlString := range urlStrings {
  282. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  283. buffer.Write(data)
  284. })
  285. if !shouldRetry {
  286. break
  287. }
  288. if err != nil {
  289. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  290. buffer.Reset()
  291. } else {
  292. break
  293. }
  294. }
  295. if err != nil {
  296. return err
  297. }
  298. c.buffer = buffer.Bytes()
  299. c.bufferOffset = chunkView.LogicOffset
  300. c.chunk = chunkView.FileId
  301. // glog.V(0).Infof("fetched %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  302. return nil
  303. }
  304. func (c *ChunkStreamReader) Close() {
  305. // TODO try to release and reuse buffer
  306. }
  307. func VolumeId(fileId string) string {
  308. lastCommaIndex := strings.LastIndex(fileId, ",")
  309. if lastCommaIndex > 0 {
  310. return fileId[:lastCommaIndex]
  311. }
  312. return fileId
  313. }