stream.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strings"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/chrislusf/seaweedfs/weed/wdclient"
  12. )
  13. func StreamContent(masterClient wdclient.HasLookupFileIdFunction, w io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error {
  14. glog.V(9).Infof("start to stream content for chunks: %+v\n", chunks)
  15. chunkViews := ViewFromChunks(masterClient.GetLookupFileIdFunction(), chunks, offset, size)
  16. fileId2Url := make(map[string][]string)
  17. for _, chunkView := range chunkViews {
  18. urlStrings, err := masterClient.GetLookupFileIdFunction()(chunkView.FileId)
  19. if err != nil {
  20. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  21. return err
  22. } else if len(urlStrings) == 0 {
  23. glog.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  24. return fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId)
  25. }
  26. fileId2Url[chunkView.FileId] = urlStrings
  27. }
  28. for _, chunkView := range chunkViews {
  29. urlStrings := fileId2Url[chunkView.FileId]
  30. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  31. if err != nil {
  32. glog.Errorf("read chunk: %v", err)
  33. return fmt.Errorf("read chunk: %v", err)
  34. }
  35. _, err = w.Write(data)
  36. if err != nil {
  37. glog.Errorf("write chunk: %v", err)
  38. return fmt.Errorf("write chunk: %v", err)
  39. }
  40. }
  41. return nil
  42. }
  43. // ---------------- ReadAllReader ----------------------------------
  44. func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) ([]byte, error) {
  45. buffer := bytes.Buffer{}
  46. lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
  47. return masterClient.LookupFileId(fileId)
  48. }
  49. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  50. for _, chunkView := range chunkViews {
  51. urlStrings, err := lookupFileIdFn(chunkView.FileId)
  52. if err != nil {
  53. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  54. return nil, err
  55. }
  56. data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size))
  57. if err != nil {
  58. return nil, err
  59. }
  60. buffer.Write(data)
  61. }
  62. return buffer.Bytes(), nil
  63. }
  64. // ---------------- ChunkStreamReader ----------------------------------
  65. type ChunkStreamReader struct {
  66. chunkViews []*ChunkView
  67. logicOffset int64
  68. buffer []byte
  69. bufferOffset int64
  70. bufferPos int
  71. chunkIndex int
  72. lookupFileId wdclient.LookupFileIdFunctionType
  73. }
  74. var _ = io.ReadSeeker(&ChunkStreamReader{})
  75. func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  76. lookupFileIdFn := func(fileId string) (targetUrl []string, err error) {
  77. return masterClient.LookupFileId(fileId)
  78. }
  79. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  80. return &ChunkStreamReader{
  81. chunkViews: chunkViews,
  82. lookupFileId: lookupFileIdFn,
  83. }
  84. }
  85. func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
  86. lookupFileIdFn := LookupFn(filerClient)
  87. chunkViews := ViewFromChunks(lookupFileIdFn, chunks, 0, math.MaxInt64)
  88. return &ChunkStreamReader{
  89. chunkViews: chunkViews,
  90. lookupFileId: lookupFileIdFn,
  91. }
  92. }
  93. func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
  94. for n < len(p) {
  95. if c.isBufferEmpty() {
  96. if c.chunkIndex >= len(c.chunkViews) {
  97. return n, io.EOF
  98. }
  99. chunkView := c.chunkViews[c.chunkIndex]
  100. c.fetchChunkToBuffer(chunkView)
  101. c.chunkIndex++
  102. }
  103. t := copy(p[n:], c.buffer[c.bufferPos:])
  104. c.bufferPos += t
  105. n += t
  106. }
  107. return
  108. }
  109. func (c *ChunkStreamReader) isBufferEmpty() bool {
  110. return len(c.buffer) <= c.bufferPos
  111. }
  112. func (c *ChunkStreamReader) Seek(offset int64, whence int) (int64, error) {
  113. var totalSize int64
  114. for _, chunk := range c.chunkViews {
  115. totalSize += int64(chunk.Size)
  116. }
  117. var err error
  118. switch whence {
  119. case io.SeekStart:
  120. case io.SeekCurrent:
  121. offset += c.bufferOffset + int64(c.bufferPos)
  122. case io.SeekEnd:
  123. offset = totalSize + offset
  124. }
  125. if offset > totalSize {
  126. err = io.ErrUnexpectedEOF
  127. }
  128. for i, chunk := range c.chunkViews {
  129. if chunk.LogicOffset <= offset && offset < chunk.LogicOffset+int64(chunk.Size) {
  130. if c.isBufferEmpty() || c.bufferOffset != chunk.LogicOffset {
  131. c.fetchChunkToBuffer(chunk)
  132. c.chunkIndex = i + 1
  133. break
  134. }
  135. }
  136. }
  137. c.bufferPos = int(offset - c.bufferOffset)
  138. return offset, err
  139. }
  140. func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error {
  141. urlStrings, err := c.lookupFileId(chunkView.FileId)
  142. if err != nil {
  143. glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
  144. return err
  145. }
  146. var buffer bytes.Buffer
  147. var shouldRetry bool
  148. for _, urlString := range urlStrings {
  149. shouldRetry, err = util.FastReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) {
  150. buffer.Write(data)
  151. })
  152. if !shouldRetry {
  153. break
  154. }
  155. if err != nil {
  156. glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
  157. buffer.Reset()
  158. } else {
  159. break
  160. }
  161. }
  162. if err != nil {
  163. return err
  164. }
  165. c.buffer = buffer.Bytes()
  166. c.bufferPos = 0
  167. c.bufferOffset = chunkView.LogicOffset
  168. // glog.V(0).Infof("read %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size))
  169. return nil
  170. }
  171. func (c *ChunkStreamReader) Close() {
  172. // TODO try to release and reuse buffer
  173. }
  174. func VolumeId(fileId string) string {
  175. lastCommaIndex := strings.LastIndex(fileId, ",")
  176. if lastCommaIndex > 0 {
  177. return fileId[:lastCommaIndex]
  178. }
  179. return fileId
  180. }