filechunk_manifest.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "github.com/chrislusf/seaweedfs/weed/util/log"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. const (
  14. ManifestBatch = 1000
  15. )
  16. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  17. for _, chunk := range chunks {
  18. if chunk.IsChunkManifest {
  19. return true
  20. }
  21. }
  22. return false
  23. }
  24. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  25. for _, c := range chunks {
  26. if c.IsChunkManifest {
  27. manifestChunks = append(manifestChunks, c)
  28. } else {
  29. nonManifestChunks = append(nonManifestChunks, c)
  30. }
  31. }
  32. return
  33. }
  34. func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  35. // TODO maybe parallel this
  36. for _, chunk := range chunks {
  37. if !chunk.IsChunkManifest {
  38. dataChunks = append(dataChunks, chunk)
  39. continue
  40. }
  41. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  42. if err != nil {
  43. return chunks, nil, err
  44. }
  45. manifestChunks = append(manifestChunks, chunk)
  46. // recursive
  47. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
  48. if subErr != nil {
  49. return chunks, nil, subErr
  50. }
  51. dataChunks = append(dataChunks, dchunks...)
  52. manifestChunks = append(manifestChunks, mchunks...)
  53. }
  54. return
  55. }
  56. func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  57. if !chunk.IsChunkManifest {
  58. return
  59. }
  60. // IsChunkManifest
  61. data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  62. if err != nil {
  63. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  64. }
  65. m := &filer_pb.FileChunkManifest{}
  66. if err := proto.Unmarshal(data, m); err != nil {
  67. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  68. }
  69. // recursive
  70. filer_pb.AfterEntryDeserialization(m.Chunks)
  71. return m.Chunks, nil
  72. }
  73. // TODO fetch from cache for weed mount?
  74. func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  75. urlStrings, err := lookupFileIdFn(fileId)
  76. if err != nil {
  77. log.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  78. return nil, err
  79. }
  80. return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
  81. }
  82. func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
  83. var err error
  84. var buffer bytes.Buffer
  85. var shouldRetry bool
  86. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  87. for _, urlString := range urlStrings {
  88. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  89. buffer.Write(data)
  90. })
  91. if !shouldRetry {
  92. break
  93. }
  94. if err != nil {
  95. log.Infof("read %s failed, err: %v", urlString, err)
  96. buffer.Reset()
  97. } else {
  98. break
  99. }
  100. }
  101. if err != nil && shouldRetry {
  102. log.Infof("retry reading in %v", waitTime)
  103. time.Sleep(waitTime)
  104. } else {
  105. break
  106. }
  107. }
  108. return buffer.Bytes(), err
  109. }
  110. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  111. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  112. }
  113. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  114. var dataChunks []*filer_pb.FileChunk
  115. for _, chunk := range inputChunks {
  116. if !chunk.IsChunkManifest {
  117. dataChunks = append(dataChunks, chunk)
  118. } else {
  119. chunks = append(chunks, chunk)
  120. }
  121. }
  122. remaining := len(dataChunks)
  123. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  124. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  125. if err != nil {
  126. return dataChunks, err
  127. }
  128. chunks = append(chunks, chunk)
  129. remaining -= mergeFactor
  130. }
  131. // remaining
  132. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  133. chunks = append(chunks, dataChunks[i])
  134. }
  135. return
  136. }
  137. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  138. filer_pb.BeforeEntrySerialization(dataChunks)
  139. // create and serialize the manifest
  140. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  141. Chunks: dataChunks,
  142. })
  143. if serErr != nil {
  144. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  145. }
  146. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  147. for _, chunk := range dataChunks {
  148. if minOffset > int64(chunk.Offset) {
  149. minOffset = chunk.Offset
  150. }
  151. if maxOffset < int64(chunk.Size)+chunk.Offset {
  152. maxOffset = int64(chunk.Size) + chunk.Offset
  153. }
  154. }
  155. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  156. if err != nil {
  157. return nil, err
  158. }
  159. manifestChunk.IsChunkManifest = true
  160. manifestChunk.Offset = minOffset
  161. manifestChunk.Size = uint64(maxOffset - minOffset)
  162. return
  163. }
  164. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)