Browse Source

[volume] Reduce the number of buffers for uploading one chunk (#5458)

Konstantin Lebedev 11 months ago
parent
commit
5189a09de0

+ 20 - 5
weed/operation/upload_content.go

@@ -5,6 +5,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/valyala/bytebufferpool"
 	"io"
 	"mime"
 	"mime/multipart"
@@ -32,6 +33,7 @@ type UploadOption struct {
 	Jwt               security.EncodedJwt
 	RetryForever      bool
 	Md5               string
+	BytesBuffer       *bytes.Buffer
 }
 
 type UploadResult struct {
@@ -261,6 +263,7 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
 			PairMap:           option.PairMap,
 			Jwt:               option.Jwt,
 			Md5:               option.Md5,
+			BytesBuffer:       option.BytesBuffer,
 		})
 		if uploadResult == nil {
 			return
@@ -275,9 +278,17 @@ func doUploadData(data []byte, option *UploadOption) (uploadResult *UploadResult
 }
 
 func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
-	buf := GetBuffer()
-	defer PutBuffer(buf)
-	body_writer := multipart.NewWriter(buf)
+	var body_writer *multipart.Writer
+	var reqReader *bytes.Reader
+	var buf *bytebufferpool.ByteBuffer
+	if option.BytesBuffer == nil {
+		buf = GetBuffer()
+		defer PutBuffer(buf)
+		body_writer = multipart.NewWriter(buf)
+	} else {
+		option.BytesBuffer.Reset()
+		body_writer = multipart.NewWriter(option.BytesBuffer)
+	}
 	h := make(textproto.MIMEHeader)
 	filename := fileNameEscaper.Replace(option.Filename)
 	h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, filename))
@@ -309,8 +320,12 @@ func upload_content(fillBufferFunction func(w io.Writer) error, originalDataSize
 		glog.V(0).Infoln("error closing body", err)
 		return nil, err
 	}
-
-	req, postErr := http.NewRequest("POST", option.UploadUrl, bytes.NewReader(buf.Bytes()))
+	if option.BytesBuffer == nil {
+		reqReader = bytes.NewReader(buf.Bytes())
+	} else {
+		reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
+	}
+	req, postErr := http.NewRequest("POST", option.UploadUrl, reqReader)
 	if postErr != nil {
 		glog.V(1).Infof("create upload request %s: %v", option.UploadUrl, postErr)
 		return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)

+ 3 - 3
weed/server/volume_server_handlers_write.go

@@ -1,7 +1,6 @@
 package weed_server
 
 import (
-	"bytes"
 	"errors"
 	"fmt"
 	"net/http"
@@ -13,6 +12,7 @@ import (
 	"github.com/seaweedfs/seaweedfs/weed/operation"
 	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
 	"github.com/seaweedfs/seaweedfs/weed/topology"
+	"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
 )
 
 func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -35,8 +35,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	bytesBuffer := bufPool.Get().(*bytes.Buffer)
-	defer bufPool.Put(bytesBuffer)
+	bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+	defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
 
 	reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer)
 	if ne != nil {

+ 3 - 9
weed/storage/needle/needle_write.go

@@ -7,16 +7,10 @@ import (
 	"github.com/seaweedfs/seaweedfs/weed/storage/backend"
 	. "github.com/seaweedfs/seaweedfs/weed/storage/types"
 	"github.com/seaweedfs/seaweedfs/weed/util"
+	"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
 	"math"
-	"sync"
 )
 
-var bufPool = sync.Pool{
-	New: func() interface{} {
-		return new(bytes.Buffer)
-	},
-}
-
 func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (Size, int64, error) {
 	writeBytes.Reset()
 	switch version {
@@ -132,8 +126,8 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
 		return
 	}
 
-	bytesBuffer := bufPool.Get().(*bytes.Buffer)
-	defer bufPool.Put(bytesBuffer)
+	bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+	defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
 
 	size, actualSize, err = n.prepareWriteBuffer(version, bytesBuffer)
 

+ 4 - 0
weed/topology/store_replicate.go

@@ -19,6 +19,7 @@ import (
 	"github.com/seaweedfs/seaweedfs/weed/storage/needle"
 	"github.com/seaweedfs/seaweedfs/weed/storage/types"
 	"github.com/seaweedfs/seaweedfs/weed/util"
+	"github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
 )
 
 func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request, contentMd5 string) (isUnchanged bool, err error) {
@@ -87,6 +88,8 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
 					pairMap[needle.PairNamePrefix+k] = v
 				}
 			}
+			bytesBuffer := buffer_pool.SyncPoolGetBuffer()
+			defer buffer_pool.SyncPoolPutBuffer(bytesBuffer)
 
 			// volume server do not know about encryption
 			// TODO optimize here to compress data only once
@@ -99,6 +102,7 @@ func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOpt
 				PairMap:           pairMap,
 				Jwt:               jwt,
 				Md5:               contentMd5,
+				BytesBuffer:       bytesBuffer,
 			}
 
 			_, err := operation.UploadData(n.Data, uploadOption)

+ 20 - 0
weed/util/buffer_pool/sync_pool.go

@@ -0,0 +1,20 @@
+package buffer_pool
+
+import (
+	"bytes"
+	"sync"
+)
+
+var syncPool = sync.Pool{
+	New: func() interface{} {
+		return new(bytes.Buffer)
+	},
+}
+
+func SyncPoolGetBuffer() *bytes.Buffer {
+	return syncPool.Get().(*bytes.Buffer)
+}
+
+func SyncPoolPutBuffer(buffer *bytes.Buffer) {
+	syncPool.Put(buffer)
+}