chunked_file.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package operation
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "sort"
  9. "sync"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. )
  15. var (
  16. // when the remote server does not allow range requests (Accept-Ranges was not set)
  17. ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
  18. // ErrInvalidRange is returned by Read when trying to read past the end of the file
  19. ErrInvalidRange = errors.New("Invalid range")
  20. )
  21. type ChunkInfo struct {
  22. Fid string `json:"fid"`
  23. Offset int64 `json:"offset"`
  24. Size int64 `json:"size"`
  25. }
  26. type ChunkList []*ChunkInfo
  27. type ChunkManifest struct {
  28. Name string `json:"name,omitempty"`
  29. Mime string `json:"mime,omitempty"`
  30. Size int64 `json:"size,omitempty"`
  31. Chunks ChunkList `json:"chunks,omitempty"`
  32. }
  33. // seekable chunked file reader
  34. type ChunkedFileReader struct {
  35. totalSize int64
  36. chunkList []*ChunkInfo
  37. master pb.ServerAddress
  38. pos int64
  39. pr *io.PipeReader
  40. pw *io.PipeWriter
  41. mutex sync.Mutex
  42. grpcDialOption grpc.DialOption
  43. }
  44. func (s ChunkList) Len() int { return len(s) }
  45. func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
  46. func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  47. func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
  48. if isCompressed {
  49. var err error
  50. if buffer, err = util.DecompressData(buffer); err != nil {
  51. glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
  52. }
  53. }
  54. cm := ChunkManifest{}
  55. if e := json.Unmarshal(buffer, &cm); e != nil {
  56. return nil, e
  57. }
  58. sort.Sort(cm.Chunks)
  59. return &cm, nil
  60. }
  61. func (cm *ChunkManifest) Marshal() ([]byte, error) {
  62. return json.Marshal(cm)
  63. }
  64. func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
  65. var fileIds []string
  66. for _, ci := range cm.Chunks {
  67. fileIds = append(fileIds, ci.Fid)
  68. }
  69. results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
  70. if err != nil {
  71. glog.V(0).Infof("delete %+v: %v", fileIds, err)
  72. return fmt.Errorf("chunk delete: %v", err)
  73. }
  74. for _, result := range results {
  75. if result.Error != "" {
  76. glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
  77. return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error)
  78. }
  79. }
  80. return nil
  81. }
  82. func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (written int64, e error) {
  83. req, err := http.NewRequest("GET", fileUrl, nil)
  84. if err != nil {
  85. return written, err
  86. }
  87. if offset > 0 {
  88. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
  89. }
  90. resp, err := util.Do(req)
  91. if err != nil {
  92. return written, err
  93. }
  94. defer util.CloseResponse(resp)
  95. switch resp.StatusCode {
  96. case http.StatusRequestedRangeNotSatisfiable:
  97. return written, ErrInvalidRange
  98. case http.StatusOK:
  99. if offset > 0 {
  100. return written, ErrRangeRequestsNotSupported
  101. }
  102. case http.StatusPartialContent:
  103. break
  104. default:
  105. return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
  106. }
  107. return io.Copy(w, resp.Body)
  108. }
  109. func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader {
  110. var totalSize int64
  111. for _, chunk := range chunkList {
  112. totalSize += chunk.Size
  113. }
  114. sort.Sort(ChunkList(chunkList))
  115. return &ChunkedFileReader{
  116. totalSize: totalSize,
  117. chunkList: chunkList,
  118. master: master,
  119. grpcDialOption: grpcDialOption,
  120. }
  121. }
  122. func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
  123. var err error
  124. switch whence {
  125. case io.SeekStart:
  126. case io.SeekCurrent:
  127. offset += cf.pos
  128. case io.SeekEnd:
  129. offset = cf.totalSize + offset
  130. }
  131. if offset > cf.totalSize {
  132. err = ErrInvalidRange
  133. }
  134. if cf.pos != offset {
  135. cf.Close()
  136. }
  137. cf.pos = offset
  138. return cf.pos, err
  139. }
  140. func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
  141. chunkIndex := -1
  142. chunkStartOffset := int64(0)
  143. for i, ci := range cf.chunkList {
  144. if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
  145. chunkIndex = i
  146. chunkStartOffset = cf.pos - ci.Offset
  147. break
  148. }
  149. }
  150. if chunkIndex < 0 {
  151. return n, ErrInvalidRange
  152. }
  153. for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
  154. ci := cf.chunkList[chunkIndex]
  155. // if we need read date from local volume server first?
  156. fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
  157. return cf.master
  158. }, cf.grpcDialOption, ci.Fid)
  159. if lookupError != nil {
  160. return n, lookupError
  161. }
  162. if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset, jwt); e != nil {
  163. return n, e
  164. } else {
  165. n += wn
  166. cf.pos += wn
  167. }
  168. chunkStartOffset = 0
  169. }
  170. return n, nil
  171. }
  172. func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
  173. cf.Seek(off, 0)
  174. return cf.Read(p)
  175. }
  176. func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
  177. return cf.getPipeReader().Read(p)
  178. }
  179. func (cf *ChunkedFileReader) Close() (e error) {
  180. cf.mutex.Lock()
  181. defer cf.mutex.Unlock()
  182. return cf.closePipe()
  183. }
  184. func (cf *ChunkedFileReader) closePipe() (e error) {
  185. if cf.pr != nil {
  186. if err := cf.pr.Close(); err != nil {
  187. e = err
  188. }
  189. }
  190. cf.pr = nil
  191. if cf.pw != nil {
  192. if err := cf.pw.Close(); err != nil {
  193. e = err
  194. }
  195. }
  196. cf.pw = nil
  197. return e
  198. }
  199. func (cf *ChunkedFileReader) getPipeReader() io.Reader {
  200. cf.mutex.Lock()
  201. defer cf.mutex.Unlock()
  202. if cf.pr != nil && cf.pw != nil {
  203. return cf.pr
  204. }
  205. cf.closePipe()
  206. cf.pr, cf.pw = io.Pipe()
  207. go func(pw *io.PipeWriter) {
  208. _, e := cf.WriteTo(pw)
  209. pw.CloseWithError(e)
  210. }(cf.pw)
  211. return cf.pr
  212. }