filechunk_manifest.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/wdclient"
  6. "io"
  7. "math"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "github.com/golang/protobuf/proto"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. const (
  17. ManifestBatch = 10000
  18. )
  19. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  20. for _, chunk := range chunks {
  21. if chunk.IsChunkManifest {
  22. return true
  23. }
  24. }
  25. return false
  26. }
  27. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  28. for _, c := range chunks {
  29. if c.IsChunkManifest {
  30. manifestChunks = append(manifestChunks, c)
  31. } else {
  32. nonManifestChunks = append(nonManifestChunks, c)
  33. }
  34. }
  35. return
  36. }
  37. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  38. // TODO maybe parallel this
  39. for _, chunk := range chunks {
  40. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  41. continue
  42. }
  43. if !chunk.IsChunkManifest {
  44. dataChunks = append(dataChunks, chunk)
  45. continue
  46. }
  47. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  48. if err != nil {
  49. return chunks, nil, err
  50. }
  51. manifestChunks = append(manifestChunks, chunk)
  52. // recursive
  53. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  54. if subErr != nil {
  55. return chunks, nil, subErr
  56. }
  57. dataChunks = append(dataChunks, dchunks...)
  58. manifestChunks = append(manifestChunks, mchunks...)
  59. }
  60. return
  61. }
  62. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  63. if !chunk.IsChunkManifest {
  64. return
  65. }
  66. // IsChunkManifest
  67. data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  68. if err != nil {
  69. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  70. }
  71. m := &filer_pb.FileChunkManifest{}
  72. if err := proto.Unmarshal(data, m); err != nil {
  73. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  74. }
  75. // recursive
  76. filer_pb.AfterEntryDeserialization(m.Chunks)
  77. return m.Chunks, nil
  78. }
  79. // TODO fetch from cache for weed mount?
  80. func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  81. urlStrings, err := lookupFileIdFn(fileId)
  82. if err != nil {
  83. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  84. return nil, err
  85. }
  86. return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
  87. }
  88. func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) {
  89. urlStrings, err := lookupFileIdFn(fileId)
  90. if err != nil {
  91. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  92. return nil, err
  93. }
  94. return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size)
  95. }
  96. func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
  97. var err error
  98. var shouldRetry bool
  99. receivedData := make([]byte, 0, size)
  100. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  101. for _, urlString := range urlStrings {
  102. receivedData = receivedData[:0]
  103. if strings.Contains(urlString, "%") {
  104. urlString = url.PathEscape(urlString)
  105. }
  106. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  107. receivedData = append(receivedData, data...)
  108. })
  109. if !shouldRetry {
  110. break
  111. }
  112. if err != nil {
  113. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  114. } else {
  115. break
  116. }
  117. }
  118. if err != nil && shouldRetry {
  119. glog.V(0).Infof("retry reading in %v", waitTime)
  120. time.Sleep(waitTime)
  121. } else {
  122. break
  123. }
  124. }
  125. return receivedData, err
  126. }
  127. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  128. var shouldRetry bool
  129. var totalWritten int
  130. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  131. for _, urlString := range urlStrings {
  132. var localProcesed int
  133. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  134. if totalWritten > localProcesed {
  135. toBeSkipped := totalWritten - localProcesed
  136. if len(data) <= toBeSkipped {
  137. localProcesed += len(data)
  138. return // skip if already processed
  139. }
  140. data = data[toBeSkipped:]
  141. localProcesed += toBeSkipped
  142. }
  143. writer.Write(data)
  144. localProcesed += len(data)
  145. totalWritten += len(data)
  146. })
  147. if !shouldRetry {
  148. break
  149. }
  150. if err != nil {
  151. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  152. } else {
  153. break
  154. }
  155. }
  156. if err != nil && shouldRetry {
  157. glog.V(0).Infof("retry reading in %v", waitTime)
  158. time.Sleep(waitTime)
  159. } else {
  160. break
  161. }
  162. }
  163. return err
  164. }
  165. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  166. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  167. }
  168. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  169. var dataChunks []*filer_pb.FileChunk
  170. for _, chunk := range inputChunks {
  171. if !chunk.IsChunkManifest {
  172. dataChunks = append(dataChunks, chunk)
  173. } else {
  174. chunks = append(chunks, chunk)
  175. }
  176. }
  177. remaining := len(dataChunks)
  178. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  179. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  180. if err != nil {
  181. return dataChunks, err
  182. }
  183. chunks = append(chunks, chunk)
  184. remaining -= mergeFactor
  185. }
  186. // remaining
  187. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  188. chunks = append(chunks, dataChunks[i])
  189. }
  190. return
  191. }
  192. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  193. filer_pb.BeforeEntrySerialization(dataChunks)
  194. // create and serialize the manifest
  195. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  196. Chunks: dataChunks,
  197. })
  198. if serErr != nil {
  199. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  200. }
  201. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  202. for _, chunk := range dataChunks {
  203. if minOffset > int64(chunk.Offset) {
  204. minOffset = chunk.Offset
  205. }
  206. if maxOffset < int64(chunk.Size)+chunk.Offset {
  207. maxOffset = int64(chunk.Size) + chunk.Offset
  208. }
  209. }
  210. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  211. if err != nil {
  212. return nil, err
  213. }
  214. manifestChunk.IsChunkManifest = true
  215. manifestChunk.Offset = minOffset
  216. manifestChunk.Size = uint64(maxOffset - minOffset)
  217. return
  218. }
  219. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)