chunked_file.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package operation
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "sort"
  9. "sync"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. // when the remote server does not allow range requests (Accept-Ranges was not set)
  17. ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
  18. // ErrInvalidRange is returned by Read when trying to read past the end of the file
  19. ErrInvalidRange = errors.New("Invalid range")
  20. )
  21. type ChunkInfo struct {
  22. Fid string `json:"fid"`
  23. Offset int64 `json:"offset"`
  24. Size int64 `json:"size"`
  25. }
  26. type ChunkList []*ChunkInfo
  27. type ChunkManifest struct {
  28. Name string `json:"name,omitempty"`
  29. Mime string `json:"mime,omitempty"`
  30. Size int64 `json:"size,omitempty"`
  31. Chunks ChunkList `json:"chunks,omitempty"`
  32. }
  33. // seekable chunked file reader
  34. type ChunkedFileReader struct {
  35. totalSize int64
  36. chunkList []*ChunkInfo
  37. master pb.ServerAddress
  38. pos int64
  39. pr *io.PipeReader
  40. pw *io.PipeWriter
  41. mutex sync.Mutex
  42. grpcDialOption grpc.DialOption
  43. }
  44. func (s ChunkList) Len() int { return len(s) }
  45. func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
  46. func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  47. func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
  48. if isCompressed {
  49. var err error
  50. if buffer, err = util.DecompressData(buffer); err != nil {
  51. glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
  52. }
  53. }
  54. cm := ChunkManifest{}
  55. if e := json.Unmarshal(buffer, &cm); e != nil {
  56. return nil, e
  57. }
  58. sort.Sort(cm.Chunks)
  59. return &cm, nil
  60. }
  61. func (cm *ChunkManifest) Marshal() ([]byte, error) {
  62. return json.Marshal(cm)
  63. }
  64. func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
  65. var fileIds []string
  66. for _, ci := range cm.Chunks {
  67. fileIds = append(fileIds, ci.Fid)
  68. }
  69. results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
  70. if err != nil {
  71. glog.V(0).Infof("delete %+v: %v", fileIds, err)
  72. return fmt.Errorf("chunk delete: %v", err)
  73. }
  74. for _, result := range results {
  75. if result.Error != "" {
  76. glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
  77. return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error)
  78. }
  79. }
  80. return nil
  81. }
  82. func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (written int64, e error) {
  83. req, err := http.NewRequest("GET", fileUrl, nil)
  84. if err != nil {
  85. return written, err
  86. }
  87. if offset > 0 {
  88. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
  89. }
  90. resp, err := util.Do(req)
  91. if err != nil {
  92. return written, err
  93. }
  94. defer func() {
  95. io.Copy(io.Discard, resp.Body)
  96. resp.Body.Close()
  97. }()
  98. switch resp.StatusCode {
  99. case http.StatusRequestedRangeNotSatisfiable:
  100. return written, ErrInvalidRange
  101. case http.StatusOK:
  102. if offset > 0 {
  103. return written, ErrRangeRequestsNotSupported
  104. }
  105. case http.StatusPartialContent:
  106. break
  107. default:
  108. return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
  109. }
  110. return io.Copy(w, resp.Body)
  111. }
  112. func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader {
  113. var totalSize int64
  114. for _, chunk := range chunkList {
  115. totalSize += chunk.Size
  116. }
  117. sort.Sort(ChunkList(chunkList))
  118. return &ChunkedFileReader{
  119. totalSize: totalSize,
  120. chunkList: chunkList,
  121. master: master,
  122. grpcDialOption: grpcDialOption,
  123. }
  124. }
  125. func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
  126. var err error
  127. switch whence {
  128. case io.SeekStart:
  129. case io.SeekCurrent:
  130. offset += cf.pos
  131. case io.SeekEnd:
  132. offset = cf.totalSize + offset
  133. }
  134. if offset > cf.totalSize {
  135. err = ErrInvalidRange
  136. }
  137. if cf.pos != offset {
  138. cf.Close()
  139. }
  140. cf.pos = offset
  141. return cf.pos, err
  142. }
  143. func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
  144. chunkIndex := -1
  145. chunkStartOffset := int64(0)
  146. for i, ci := range cf.chunkList {
  147. if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
  148. chunkIndex = i
  149. chunkStartOffset = cf.pos - ci.Offset
  150. break
  151. }
  152. }
  153. if chunkIndex < 0 {
  154. return n, ErrInvalidRange
  155. }
  156. for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
  157. ci := cf.chunkList[chunkIndex]
  158. // if we need read date from local volume server first?
  159. fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
  160. return cf.master
  161. }, cf.grpcDialOption, ci.Fid)
  162. if lookupError != nil {
  163. return n, lookupError
  164. }
  165. if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset, jwt); e != nil {
  166. return n, e
  167. } else {
  168. n += wn
  169. cf.pos += wn
  170. }
  171. chunkStartOffset = 0
  172. }
  173. return n, nil
  174. }
  175. func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
  176. cf.Seek(off, 0)
  177. return cf.Read(p)
  178. }
  179. func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
  180. return cf.getPipeReader().Read(p)
  181. }
  182. func (cf *ChunkedFileReader) Close() (e error) {
  183. cf.mutex.Lock()
  184. defer cf.mutex.Unlock()
  185. return cf.closePipe()
  186. }
  187. func (cf *ChunkedFileReader) closePipe() (e error) {
  188. if cf.pr != nil {
  189. if err := cf.pr.Close(); err != nil {
  190. e = err
  191. }
  192. }
  193. cf.pr = nil
  194. if cf.pw != nil {
  195. if err := cf.pw.Close(); err != nil {
  196. e = err
  197. }
  198. }
  199. cf.pw = nil
  200. return e
  201. }
  202. func (cf *ChunkedFileReader) getPipeReader() io.Reader {
  203. cf.mutex.Lock()
  204. defer cf.mutex.Unlock()
  205. if cf.pr != nil && cf.pw != nil {
  206. return cf.pr
  207. }
  208. cf.closePipe()
  209. cf.pr, cf.pw = io.Pipe()
  210. go func(pw *io.PipeWriter) {
  211. _, e := cf.WriteTo(pw)
  212. pw.CloseWithError(e)
  213. }(cf.pw)
  214. return cf.pr
  215. }