filechunk_manifest.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package filer2
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "github.com/golang/protobuf/proto"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. const (
  13. ManifestBatch = 1000
  14. )
  15. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  16. for _, chunk := range chunks {
  17. if chunk.IsChunkManifest {
  18. return true
  19. }
  20. }
  21. return false
  22. }
  23. func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) {
  24. // TODO maybe parallel this
  25. for _, chunk := range chunks {
  26. if !chunk.IsChunkManifest {
  27. dataChunks = append(dataChunks, chunk)
  28. continue
  29. }
  30. // IsChunkManifest
  31. data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed)
  32. if err != nil {
  33. return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err)
  34. }
  35. m := &filer_pb.FileChunkManifest{}
  36. if err := proto.Unmarshal(data, m); err != nil {
  37. return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err)
  38. }
  39. manifestChunks = append(manifestChunks, chunk)
  40. // recursive
  41. filer_pb.AfterEntryDeserialization(m.Chunks)
  42. dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks)
  43. if subErr != nil {
  44. return chunks, nil, subErr
  45. }
  46. dataChunks = append(dataChunks, dchunks...)
  47. manifestChunks = append(manifestChunks, mchunks...)
  48. }
  49. return
  50. }
  51. // TODO fetch from cache for weed mount?
  52. func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
  53. urlString, err := lookupFileIdFn(fileId)
  54. if err != nil {
  55. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  56. return nil, err
  57. }
  58. var buffer bytes.Buffer
  59. err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) {
  60. buffer.Write(data)
  61. })
  62. if err != nil {
  63. glog.V(0).Infof("read %s failed, err: %v", fileId, err)
  64. return nil, err
  65. }
  66. return buffer.Bytes(), nil
  67. }
  68. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  69. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  70. }
  71. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  72. var dataChunks []*filer_pb.FileChunk
  73. for _, chunk := range inputChunks {
  74. if !chunk.IsChunkManifest {
  75. dataChunks = append(dataChunks, chunk)
  76. } else {
  77. chunks = append(chunks, chunk)
  78. }
  79. }
  80. remaining := len(dataChunks)
  81. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  82. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  83. if err != nil {
  84. return dataChunks, err
  85. }
  86. chunks = append(chunks, chunk)
  87. remaining -= mergeFactor
  88. }
  89. // remaining
  90. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  91. chunks = append(chunks, dataChunks[i])
  92. }
  93. return
  94. }
  95. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  96. filer_pb.BeforeEntrySerialization(dataChunks)
  97. // create and serialize the manifest
  98. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  99. Chunks: dataChunks,
  100. })
  101. if serErr != nil {
  102. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  103. }
  104. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  105. for _, chunk := range dataChunks {
  106. if minOffset > int64(chunk.Offset) {
  107. minOffset = chunk.Offset
  108. }
  109. if maxOffset < int64(chunk.Size)+chunk.Offset {
  110. maxOffset = int64(chunk.Size) + chunk.Offset
  111. }
  112. }
  113. manifestChunk, _, _, err = saveFunc(bytes.NewReader(data), "", 0)
  114. if err != nil {
  115. return nil, err
  116. }
  117. manifestChunk.IsChunkManifest = true
  118. manifestChunk.Offset = minOffset
  119. manifestChunk.Size = uint64(maxOffset - minOffset)
  120. return
  121. }
  122. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error)