filer_server_handlers_write_merge.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package weed_server
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/stats"
  8. "io"
  9. "math"
  10. )
  11. const MergeChunkMinCount int = 1000
  12. func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
  13. // Only merge small chunks more than half of the file
  14. var chunkSize = fs.option.MaxMB * 1024 * 1024
  15. var smallChunk, sumChunk int
  16. var minOffset int64 = math.MaxInt64
  17. for _, chunk := range inputChunks {
  18. if chunk.IsChunkManifest {
  19. continue
  20. }
  21. if chunk.Size < uint64(chunkSize/2) {
  22. smallChunk++
  23. if chunk.Offset < minOffset {
  24. minOffset = chunk.Offset
  25. }
  26. }
  27. sumChunk++
  28. }
  29. if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
  30. return inputChunks, nil
  31. }
  32. return fs.mergeChunks(so, inputChunks, minOffset)
  33. }
  34. func (fs *FilerServer) mergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
  35. chunkedFileReader := filer.NewChunkStreamReaderFromFiler(fs.filer.MasterClient, inputChunks)
  36. _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
  37. if mergeErr != nil {
  38. return nil, mergeErr
  39. }
  40. mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
  41. if mergeErr != nil {
  42. return
  43. }
  44. stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
  45. for _, chunk := range inputChunks {
  46. if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
  47. mergedChunks = append(mergedChunks, chunk)
  48. }
  49. }
  50. garbage, err := filer.MinusChunks(fs.lookupFileId, inputChunks, mergedChunks)
  51. if err != nil {
  52. glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
  53. mergedChunks, inputChunks)
  54. return
  55. }
  56. fs.filer.DeleteChunksNotRecursive(garbage)
  57. return
  58. }