filechunk_manifest.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package filer
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "math"
  7. "net/url"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  12. "google.golang.org/protobuf/proto"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. )
  17. const (
  18. ManifestBatch = 10000
  19. )
  20. var bytesBufferPool = sync.Pool{
  21. New: func() interface{} {
  22. return new(bytes.Buffer)
  23. },
  24. }
  25. func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
  26. for _, chunk := range chunks {
  27. if chunk.IsChunkManifest {
  28. return true
  29. }
  30. }
  31. return false
  32. }
  33. func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
  34. for _, c := range chunks {
  35. if c.IsChunkManifest {
  36. manifestChunks = append(manifestChunks, c)
  37. } else {
  38. nonManifestChunks = append(nonManifestChunks, c)
  39. }
  40. }
  41. return
  42. }
  43. func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset, stopOffset int64) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  44. // TODO maybe parallel this
  45. for _, chunk := range chunks {
  46. if max(chunk.Offset, startOffset) >= min(chunk.Offset+int64(chunk.Size), stopOffset) {
  47. continue
  48. }
  49. if !chunk.IsChunkManifest {
  50. dataChunks = append(dataChunks, chunk)
  51. continue
  52. }
  53. resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
  54. if err != nil {
  55. return dataChunks, nil, err
  56. }
  57. manifestChunks = append(manifestChunks, chunk)
  58. // recursive
  59. subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
  60. if subErr != nil {
  61. return dataChunks, nil, subErr
  62. }
  63. dataChunks = append(dataChunks, subDataChunks...)
  64. manifestChunks = append(manifestChunks, subManifestChunks...)
  65. }
  66. return
  67. }
  68. func ResolveOneChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
  69. if !chunk.IsChunkManifest {
  70. return
  71. }
  72. // IsChunkManifest
  73. bytesBuffer := bytesBufferPool.Get().(*bytes.Buffer)
  74. bytesBuffer.Reset()
  75. defer bytesBufferPool.Put(bytesBuffer)
  76. err := fetchWholeChunk(bytesBuffer, lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
  77. if err != nil {
  78. return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
  79. }
  80. m := &filer_pb.FileChunkManifest{}
  81. if err := proto.Unmarshal(bytesBuffer.Bytes(), m); err != nil {
  82. return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
  83. }
  84. // recursive
  85. filer_pb.AfterEntryDeserialization(m.Chunks)
  86. return m.Chunks, nil
  87. }
  88. // TODO fetch from cache for weed mount?
  89. func fetchWholeChunk(bytesBuffer *bytes.Buffer, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) error {
  90. urlStrings, err := lookupFileIdFn(fileId)
  91. if err != nil {
  92. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  93. return err
  94. }
  95. err = retriedStreamFetchChunkData(bytesBuffer, urlStrings, cipherKey, isGzipped, true, 0, 0)
  96. if err != nil {
  97. return err
  98. }
  99. return nil
  100. }
  101. func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64) (int, error) {
  102. urlStrings, err := lookupFileIdFn(fileId)
  103. if err != nil {
  104. glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
  105. return 0, err
  106. }
  107. return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
  108. }
  109. func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
  110. var shouldRetry bool
  111. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  112. for _, urlString := range urlStrings {
  113. n = 0
  114. if strings.Contains(urlString, "%") {
  115. urlString = url.PathEscape(urlString)
  116. }
  117. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
  118. if n < len(buffer) {
  119. x := copy(buffer[n:], data)
  120. n += x
  121. }
  122. })
  123. if !shouldRetry {
  124. break
  125. }
  126. if err != nil {
  127. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  128. } else {
  129. break
  130. }
  131. }
  132. if err != nil && shouldRetry {
  133. glog.V(0).Infof("retry reading in %v", waitTime)
  134. time.Sleep(waitTime)
  135. } else {
  136. break
  137. }
  138. }
  139. return n, err
  140. }
  141. func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
  142. var shouldRetry bool
  143. var totalWritten int
  144. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  145. for _, urlString := range urlStrings {
  146. var localProcessed int
  147. var writeErr error
  148. shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
  149. if totalWritten > localProcessed {
  150. toBeSkipped := totalWritten - localProcessed
  151. if len(data) <= toBeSkipped {
  152. localProcessed += len(data)
  153. return // skip if already processed
  154. }
  155. data = data[toBeSkipped:]
  156. localProcessed += toBeSkipped
  157. }
  158. var writtenCount int
  159. writtenCount, writeErr = writer.Write(data)
  160. localProcessed += writtenCount
  161. totalWritten += writtenCount
  162. })
  163. if !shouldRetry {
  164. break
  165. }
  166. if writeErr != nil {
  167. err = writeErr
  168. break
  169. }
  170. if err != nil {
  171. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  172. } else {
  173. break
  174. }
  175. }
  176. if err != nil && shouldRetry {
  177. glog.V(0).Infof("retry reading in %v", waitTime)
  178. time.Sleep(waitTime)
  179. } else {
  180. break
  181. }
  182. }
  183. return err
  184. }
  185. func MaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk) (chunks []*filer_pb.FileChunk, err error) {
  186. return doMaybeManifestize(saveFunc, inputChunks, ManifestBatch, mergeIntoManifest)
  187. }
  188. func doMaybeManifestize(saveFunc SaveDataAsChunkFunctionType, inputChunks []*filer_pb.FileChunk, mergeFactor int, mergefn func(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error)) (chunks []*filer_pb.FileChunk, err error) {
  189. var dataChunks []*filer_pb.FileChunk
  190. for _, chunk := range inputChunks {
  191. if !chunk.IsChunkManifest {
  192. dataChunks = append(dataChunks, chunk)
  193. } else {
  194. chunks = append(chunks, chunk)
  195. }
  196. }
  197. remaining := len(dataChunks)
  198. for i := 0; i+mergeFactor <= len(dataChunks); i += mergeFactor {
  199. chunk, err := mergefn(saveFunc, dataChunks[i:i+mergeFactor])
  200. if err != nil {
  201. return dataChunks, err
  202. }
  203. chunks = append(chunks, chunk)
  204. remaining -= mergeFactor
  205. }
  206. // remaining
  207. for i := len(dataChunks) - remaining; i < len(dataChunks); i++ {
  208. chunks = append(chunks, dataChunks[i])
  209. }
  210. return
  211. }
  212. func mergeIntoManifest(saveFunc SaveDataAsChunkFunctionType, dataChunks []*filer_pb.FileChunk) (manifestChunk *filer_pb.FileChunk, err error) {
  213. filer_pb.BeforeEntrySerialization(dataChunks)
  214. // create and serialize the manifest
  215. data, serErr := proto.Marshal(&filer_pb.FileChunkManifest{
  216. Chunks: dataChunks,
  217. })
  218. if serErr != nil {
  219. return nil, fmt.Errorf("serializing manifest: %v", serErr)
  220. }
  221. minOffset, maxOffset := int64(math.MaxInt64), int64(math.MinInt64)
  222. for _, chunk := range dataChunks {
  223. if minOffset > int64(chunk.Offset) {
  224. minOffset = chunk.Offset
  225. }
  226. if maxOffset < int64(chunk.Size)+chunk.Offset {
  227. maxOffset = int64(chunk.Size) + chunk.Offset
  228. }
  229. }
  230. manifestChunk, err = saveFunc(bytes.NewReader(data), "", 0, 0)
  231. if err != nil {
  232. return nil, err
  233. }
  234. manifestChunk.IsChunkManifest = true
  235. manifestChunk.Offset = minOffset
  236. manifestChunk.Size = uint64(maxOffset - minOffset)
  237. return
  238. }
  239. type SaveDataAsChunkFunctionType func(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error)