chunked_file.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package operation
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "sort"
  10. "sync"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  16. )
  17. var (
  18. // when the remote server does not allow range requests (Accept-Ranges was not set)
  19. ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
  20. // ErrInvalidRange is returned by Read when trying to read past the end of the file
  21. ErrInvalidRange = errors.New("Invalid range")
  22. )
  23. type ChunkInfo struct {
  24. Fid string `json:"fid"`
  25. Offset int64 `json:"offset"`
  26. Size int64 `json:"size"`
  27. }
  28. type ChunkList []*ChunkInfo
  29. type ChunkManifest struct {
  30. Name string `json:"name,omitempty"`
  31. Mime string `json:"mime,omitempty"`
  32. Size int64 `json:"size,omitempty"`
  33. Chunks ChunkList `json:"chunks,omitempty"`
  34. }
  35. // seekable chunked file reader
  36. type ChunkedFileReader struct {
  37. totalSize int64
  38. chunkList []*ChunkInfo
  39. master pb.ServerAddress
  40. pos int64
  41. pr *io.PipeReader
  42. pw *io.PipeWriter
  43. mutex sync.Mutex
  44. grpcDialOption grpc.DialOption
  45. }
  46. func (s ChunkList) Len() int { return len(s) }
  47. func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
  48. func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  49. func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
  50. if isCompressed {
  51. var err error
  52. if buffer, err = util.DecompressData(buffer); err != nil {
  53. glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
  54. }
  55. }
  56. cm := ChunkManifest{}
  57. if e := json.Unmarshal(buffer, &cm); e != nil {
  58. return nil, e
  59. }
  60. sort.Sort(cm.Chunks)
  61. return &cm, nil
  62. }
  63. func (cm *ChunkManifest) Marshal() ([]byte, error) {
  64. return json.Marshal(cm)
  65. }
  66. func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
  67. var fileIds []string
  68. for _, ci := range cm.Chunks {
  69. fileIds = append(fileIds, ci.Fid)
  70. }
  71. results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
  72. if err != nil {
  73. glog.V(0).Infof("delete %+v: %v", fileIds, err)
  74. return fmt.Errorf("chunk delete: %v", err)
  75. }
  76. for _, result := range results {
  77. if result.Error != "" {
  78. glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
  79. return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error)
  80. }
  81. }
  82. return nil
  83. }
  84. func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (written int64, e error) {
  85. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  86. if err != nil {
  87. return written, err
  88. }
  89. if offset > 0 {
  90. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
  91. }
  92. resp, err := util_http.Do(req)
  93. if err != nil {
  94. return written, err
  95. }
  96. defer util_http.CloseResponse(resp)
  97. switch resp.StatusCode {
  98. case http.StatusRequestedRangeNotSatisfiable:
  99. return written, ErrInvalidRange
  100. case http.StatusOK:
  101. if offset > 0 {
  102. return written, ErrRangeRequestsNotSupported
  103. }
  104. case http.StatusPartialContent:
  105. break
  106. default:
  107. return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
  108. }
  109. return io.Copy(w, resp.Body)
  110. }
  111. func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader {
  112. var totalSize int64
  113. for _, chunk := range chunkList {
  114. totalSize += chunk.Size
  115. }
  116. sort.Sort(ChunkList(chunkList))
  117. return &ChunkedFileReader{
  118. totalSize: totalSize,
  119. chunkList: chunkList,
  120. master: master,
  121. grpcDialOption: grpcDialOption,
  122. }
  123. }
  124. func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
  125. var err error
  126. switch whence {
  127. case io.SeekStart:
  128. case io.SeekCurrent:
  129. offset += cf.pos
  130. case io.SeekEnd:
  131. offset = cf.totalSize + offset
  132. }
  133. if offset > cf.totalSize {
  134. err = ErrInvalidRange
  135. }
  136. if cf.pos != offset {
  137. cf.Close()
  138. }
  139. cf.pos = offset
  140. return cf.pos, err
  141. }
  142. func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
  143. chunkIndex := -1
  144. chunkStartOffset := int64(0)
  145. for i, ci := range cf.chunkList {
  146. if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
  147. chunkIndex = i
  148. chunkStartOffset = cf.pos - ci.Offset
  149. break
  150. }
  151. }
  152. if chunkIndex < 0 {
  153. return n, ErrInvalidRange
  154. }
  155. for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
  156. ci := cf.chunkList[chunkIndex]
  157. // if we need read date from local volume server first?
  158. fileUrl, jwt, lookupError := LookupFileId(func(_ context.Context) pb.ServerAddress {
  159. return cf.master
  160. }, cf.grpcDialOption, ci.Fid)
  161. if lookupError != nil {
  162. return n, lookupError
  163. }
  164. if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset, jwt); e != nil {
  165. return n, e
  166. } else {
  167. n += wn
  168. cf.pos += wn
  169. }
  170. chunkStartOffset = 0
  171. }
  172. return n, nil
  173. }
  174. func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
  175. cf.Seek(off, 0)
  176. return cf.Read(p)
  177. }
  178. func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
  179. return cf.getPipeReader().Read(p)
  180. }
  181. func (cf *ChunkedFileReader) Close() (e error) {
  182. cf.mutex.Lock()
  183. defer cf.mutex.Unlock()
  184. return cf.closePipe()
  185. }
  186. func (cf *ChunkedFileReader) closePipe() (e error) {
  187. if cf.pr != nil {
  188. if err := cf.pr.Close(); err != nil {
  189. e = err
  190. }
  191. }
  192. cf.pr = nil
  193. if cf.pw != nil {
  194. if err := cf.pw.Close(); err != nil {
  195. e = err
  196. }
  197. }
  198. cf.pw = nil
  199. return e
  200. }
  201. func (cf *ChunkedFileReader) getPipeReader() io.Reader {
  202. cf.mutex.Lock()
  203. defer cf.mutex.Unlock()
  204. if cf.pr != nil && cf.pw != nil {
  205. return cf.pr
  206. }
  207. cf.closePipe()
  208. cf.pr, cf.pw = io.Pipe()
  209. go func(pw *io.PipeWriter) {
  210. _, e := cf.WriteTo(pw)
  211. pw.CloseWithError(e)
  212. }(cf.pw)
  213. return cf.pr
  214. }