chunked_file.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package operation
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "sort"
  9. "sync"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. // when the remote server does not allow range requests (Accept-Ranges was not set)
  15. ErrRangeRequestsNotSupported = errors.New("Range requests are not supported by the remote server")
  16. // ErrInvalidRange is returned by Read when trying to read past the end of the file
  17. ErrInvalidRange = errors.New("Invalid range")
  18. )
  19. type ChunkInfo struct {
  20. Fid string `json:"fid"`
  21. Offset int64 `json:"offset"`
  22. Size int64 `json:"size"`
  23. }
  24. type ChunkList []*ChunkInfo
  25. type ChunkManifest struct {
  26. Name string `json:"name,omitempty"`
  27. Mime string `json:"mime,omitempty"`
  28. Size int64 `json:"size,omitempty"`
  29. Chunks ChunkList `json:"chunks,omitempty"`
  30. }
  31. // seekable chunked file reader
  32. type ChunkedFileReader struct {
  33. Manifest *ChunkManifest
  34. Master string
  35. pos int64
  36. pr *io.PipeReader
  37. pw *io.PipeWriter
  38. mutex sync.Mutex
  39. }
  40. func (s ChunkList) Len() int { return len(s) }
  41. func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
  42. func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  43. func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
  44. if isGzipped {
  45. var err error
  46. if buffer, err = UnGzipData(buffer); err != nil {
  47. return nil, err
  48. }
  49. }
  50. cm := ChunkManifest{}
  51. if e := json.Unmarshal(buffer, &cm); e != nil {
  52. return nil, e
  53. }
  54. sort.Sort(cm.Chunks)
  55. return &cm, nil
  56. }
  57. func (cm *ChunkManifest) Marshal() ([]byte, error) {
  58. return json.Marshal(cm)
  59. }
  60. func (cm *ChunkManifest) DeleteChunks(master string) error {
  61. deleteError := 0
  62. for _, ci := range cm.Chunks {
  63. if e := DeleteFile(master, ci.Fid, ""); e != nil {
  64. deleteError++
  65. glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
  66. }
  67. }
  68. if deleteError > 0 {
  69. return errors.New("Not all chunks deleted.")
  70. }
  71. return nil
  72. }
  73. func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
  74. req, err := http.NewRequest("GET", fileUrl, nil)
  75. if err != nil {
  76. return written, err
  77. }
  78. if offset > 0 {
  79. req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
  80. }
  81. resp, err := util.Do(req)
  82. if err != nil {
  83. return written, err
  84. }
  85. defer resp.Body.Close()
  86. switch resp.StatusCode {
  87. case http.StatusRequestedRangeNotSatisfiable:
  88. return written, ErrInvalidRange
  89. case http.StatusOK:
  90. if offset > 0 {
  91. return written, ErrRangeRequestsNotSupported
  92. }
  93. case http.StatusPartialContent:
  94. break
  95. default:
  96. return written, fmt.Errorf("Read chunk needle error: [%d] %s", resp.StatusCode, fileUrl)
  97. }
  98. return io.Copy(w, resp.Body)
  99. }
  100. func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
  101. var err error
  102. switch whence {
  103. case 0:
  104. case 1:
  105. offset += cf.pos
  106. case 2:
  107. offset = cf.Manifest.Size - offset
  108. }
  109. if offset > cf.Manifest.Size {
  110. err = ErrInvalidRange
  111. }
  112. if cf.pos != offset {
  113. cf.Close()
  114. }
  115. cf.pos = offset
  116. return cf.pos, err
  117. }
  118. func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
  119. cm := cf.Manifest
  120. chunkIndex := -1
  121. chunkStartOffset := int64(0)
  122. for i, ci := range cm.Chunks {
  123. if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
  124. chunkIndex = i
  125. chunkStartOffset = cf.pos - ci.Offset
  126. break
  127. }
  128. }
  129. if chunkIndex < 0 {
  130. return n, ErrInvalidRange
  131. }
  132. for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
  133. ci := cm.Chunks[chunkIndex]
  134. // if we need read date from local volume server first?
  135. fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
  136. if lookupError != nil {
  137. return n, lookupError
  138. }
  139. if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil {
  140. return n, e
  141. } else {
  142. n += wn
  143. cf.pos += wn
  144. }
  145. chunkStartOffset = 0
  146. }
  147. return n, nil
  148. }
  149. func (cf *ChunkedFileReader) ReadAt(p []byte, off int64) (n int, err error) {
  150. cf.Seek(off, 0)
  151. return cf.Read(p)
  152. }
  153. func (cf *ChunkedFileReader) Read(p []byte) (int, error) {
  154. return cf.getPipeReader().Read(p)
  155. }
  156. func (cf *ChunkedFileReader) Close() (e error) {
  157. cf.mutex.Lock()
  158. defer cf.mutex.Unlock()
  159. return cf.closePipe()
  160. }
  161. func (cf *ChunkedFileReader) closePipe() (e error) {
  162. if cf.pr != nil {
  163. if err := cf.pr.Close(); err != nil {
  164. e = err
  165. }
  166. }
  167. cf.pr = nil
  168. if cf.pw != nil {
  169. if err := cf.pw.Close(); err != nil {
  170. e = err
  171. }
  172. }
  173. cf.pw = nil
  174. return e
  175. }
  176. func (cf *ChunkedFileReader) getPipeReader() io.Reader {
  177. cf.mutex.Lock()
  178. defer cf.mutex.Unlock()
  179. if cf.pr != nil && cf.pw != nil {
  180. return cf.pr
  181. }
  182. cf.closePipe()
  183. cf.pr, cf.pw = io.Pipe()
  184. go func(pw *io.PipeWriter) {
  185. _, e := cf.WriteTo(pw)
  186. pw.CloseWithError(e)
  187. }(cf.pw)
  188. return cf.pr
  189. }