filechunk_manifest.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "sync"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  10. "google.golang.org/protobuf/proto"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. )
  15. const (
  16. ManifestBatch = 10000
  17. )
  18. var bytesBufferPool = sync.Pool{
  19. New: func() interface{} {
  20. return new(bytes.Buffer)
  21. },
  22. }
  23. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  24. for _, chunk := range chunks {
  25. if chunk.IsChunkManifest {
  26. return true
  27. }
  28. }
  29. return false
  30. }
  31. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  32. for _, c := range chunks {
  33. if c.IsChunkManifest {
  34. manifestChunks = append(manifestChunks, c)
  35. } else {
  36. nonManifestChunks = append(nonManifestChunks, c)
  37. }
  38. }
  39. return
  40. }
  41. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  42. // TODO maybe parallel this
  43. for _, chunk := range chunks {
  44. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  45. continue
  46. }
  47. if !chunk.IsChunkManifest {
  48. dataChunks = append(dataChunks, chunk)
  49. continue
  50. }
  51. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  52. if err != nil {
  53. return dataChunks, nil, err
  54. }
  55. manifestChunks = append(manifestChunks, chunk)
  56. // recursive
  57. subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  58. if subErr != nil {
  59. return dataChunks, nil, subErr
  60. }
  61. dataChunks = append(dataChunks, subDataChunks...)
  62. manifestChunks = append(manifestChunks, subManifestChunks...)
  63. }
  64. return
  65. }
  66. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  67. if !chunk.IsChunkManifest {
  68. return
  69. }
  70. // IsChunkManifest
  71. bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
  72. bytesBuffer.Reset()
  73. defer bytesBufferPool.Put(bytesBuffer)
  74. err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  75. if err != nil {
  76. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  77. }
  78. m := &filer_pb.FileChunkManifest{}
  79. if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
  80. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  81. }
  82. // recursive
  83. filer_pb.AfterEntryDeserialization(m.Chunks)
  84. return m.Chunks, nil
  85. }
  86. // TODO fetch from cache for weed mount?
  87. func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
  88. urlStrings, err := lookupFileIdFn(fileId)
  89. if err != nil {
  90. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  91. return err
  92. }
  93. err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, "", cipherKey, isGzipped, true, 0, 0)
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
  100. urlStrings, err := lookupFileIdFn(fileId)
  101. if err != nil {
  102. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  103. return 0, err
  104. }
  105. return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
  106. }
  107. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  108. var shouldRetry bool
  109. var totalWritten int
  110. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  111. for _, urlString := range urlStrings {
  112. var localProcessed int
  113. var writeErr error
  114. shouldRetry, err = util.ReadUrlAsStreamAuthenticated(urlString+"?readDeleted=true", jwt, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  115. if totalWritten > localProcessed {
  116. toBeSkipped := totalWritten - localProcessed
  117. if len(data) <= toBeSkipped {
  118. localProcessed += len(data)
  119. return // skip if already processed
  120. }
  121. data = data[toBeSkipped:]
  122. localProcessed += toBeSkipped
  123. }
  124. var writtenCount int
  125. writtenCount, writeErr = writer.Write(data)
  126. localProcessed += writtenCount
  127. totalWritten += writtenCount
  128. })
  129. if !shouldRetry {
  130. break
  131. }
  132. if writeErr != nil {
  133. err = writeErr
  134. break
  135. }
  136. if err != nil {
  137. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  138. } else {
  139. break
  140. }
  141. }
  142. if err != nil && shouldRetry {
  143. glog.V(0).Infof("retry reading in %v", waitTime)
  144. time.Sleep(waitTime)
  145. } else {
  146. break
  147. }
  148. }
  149. return err
  150. }
  151. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  152. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  153. }
  154. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  155. var dataChunks []*filer_pb.FileChunk
  156. for _, chunk := range inputChunks {
  157. if !chunk.IsChunkManifest {
  158. dataChunks = append(dataChunks, chunk)
  159. } else {
  160. chunks = append(chunks, chunk)
  161. }
  162. }
  163. remaining := len(dataChunks)
  164. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  165. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  166. if err != nil {
  167. return dataChunks, err
  168. }
  169. chunks = append(chunks, chunk)
  170. remaining -= mergeFactor
  171. }
  172. // remaining
  173. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  174. chunks = append(chunks, dataChunks[i])
  175. }
  176. return
  177. }
  178. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  179. filer_pb.BeforeEntrySerialization(dataChunks)
  180. // create and serialize the manifest
  181. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  182. Chunks: dataChunks,
  183. })
  184. if serErr != nil {
  185. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  186. }
  187. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  188. for _, chunk := range dataChunks {
  189. if minOffset > int64(chunk.Offset) {
  190. minOffset = chunk.Offset
  191. }
  192. if maxOffset < int64(chunk.Size)+chunk.Offset {
  193. maxOffset = int64(chunk.Size) + chunk.Offset
  194. }
  195. }
  196. manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0)
  197. if err != nil {
  198. return nil, err
  199. }
  200. manifestChunk.IsChunkManifest = true
  201. manifestChunk.Offset = minOffset
  202. manifestChunk.Size = uint64(maxOffset - minOffset)
  203. return
  204. }
  205. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)