Просмотр исходного кода

use bytebuffer for interval list

Chris Lu 4 лет назад
Родитель
Сommit
9884dfc369
4 измененных файлов с 61 добавлено и 28 удалено
  1. 1 1
      go.mod
  2. 3 8
      go.sum
  3. 3 1
      weed/filesys/dirty_page.go
  4. 54 18
      weed/filesys/dirty_page_interval.go

+ 1 - 1
go.mod

@@ -34,7 +34,6 @@ require (
 	github.com/gorilla/mux v1.7.4
 	github.com/gorilla/websocket v1.4.1 // indirect
 	github.com/grpc-ecosystem/grpc-gateway v1.11.0 // indirect
-	github.com/hashicorp/golang-lru v0.5.3 // indirect
 	github.com/jcmturner/gofork v1.0.0 // indirect
 	github.com/json-iterator/go v1.1.10
 	github.com/karlseguin/ccache v2.0.3+incompatible
@@ -70,6 +69,7 @@ require (
 	github.com/syndtr/goleveldb v1.0.0
 	github.com/tidwall/gjson v1.3.2
 	github.com/tidwall/match v1.0.1
+	github.com/valyala/bytebufferpool v1.0.0
 	github.com/willf/bitset v1.1.10 // indirect
 	github.com/willf/bloom v2.0.3+incompatible
 	github.com/wsxiaoys/terminal v0.0.0-20160513160801-0940f3fc43a0 // indirect

+ 3 - 8
go.sum

@@ -45,8 +45,6 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
 github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
 github.com/aws/aws-sdk-go v1.19.18/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
 github.com/aws/aws-sdk-go v1.19.45/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
-github.com/aws/aws-sdk-go v1.23.13 h1:l/NG+mgQFRGG3dsFzEj0jw9JIs/zYdtU6MXhY1WIDmM=
-github.com/aws/aws-sdk-go v1.23.13/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
 github.com/aws/aws-sdk-go v1.33.5 h1:p2fr1ryvNTU6avUWLI+/H7FGv0TBIjzVM5WDgXBBv4U=
 github.com/aws/aws-sdk-go v1.33.5/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@@ -67,8 +65,6 @@ github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA
 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
 github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
 github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
-github.com/chrislusf/raft v1.0.1 h1:Wa4ffkmkysW7cX3T/gMC/Mk3PhnOXhsqOVwQJcMndhw=
-github.com/chrislusf/raft v1.0.1/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
 github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011 h1:vN1GvfLgDg8kIPCdhuVKAjlYpxG1B86jiKejB6MC/Q0=
 github.com/chrislusf/raft v1.0.2-0.20201002174524-b13c3bfdb011/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@@ -127,6 +123,7 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm
 github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
 github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
 github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
 github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
@@ -267,8 +264,6 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
-github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk=
-github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
@@ -508,6 +503,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1
 github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw=
 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
 github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
+github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
+github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
 github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc=
 github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
 github.com/willf/bloom v2.0.3+incompatible h1:QDacWdqcAUI1MPOwIQZRy9kOR7yxfyEmxX8Wdm2/JPA=
@@ -601,8 +598,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190909003024-a7b16738d86b h1:XfVGCX+0T4WOStkaOsJRllbsiImhB2jgVBGc9L0lPGc=
-golang.org/x/net v0.0.0-20190909003024-a7b16738d86b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=

+ 3 - 1
weed/filesys/dirty_page.go

@@ -54,7 +54,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) {
 
 	pages.intervals.AddInterval(data, offset)
 
-	if pages.intervals.TotalSize() > pages.f.wfs.option.ChunkSizeLimit {
+	if pages.intervals.TotalSize() >= pages.f.wfs.option.ChunkSizeLimit {
 		pages.saveExistingLargestPageToStorage()
 	}
 
@@ -93,6 +93,8 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
 
 	pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
 
+	maxList.Destroy()
+
 	return true
 }
 

+ 54 - 18
weed/filesys/dirty_page_interval.go

@@ -5,6 +5,7 @@ import (
 	"io"
 
 	"github.com/chrislusf/seaweedfs/weed/util"
+	"github.com/valyala/bytebufferpool"
 )
 
 type IntervalNode struct {
@@ -12,6 +13,15 @@ type IntervalNode struct {
 	Offset int64
 	Size   int64
 	Next   *IntervalNode
+	Buffer *bytebufferpool.ByteBuffer
+}
+
+func (l *IntervalNode) Bytes() []byte {
+	data := l.Data
+	if data == nil {
+		data = l.Buffer.Bytes()
+	}
+	return data
 }
 
 type IntervalLinkedList struct {
@@ -23,16 +33,39 @@ type ContinuousIntervals struct {
 	lists []*IntervalLinkedList
 }
 
+func NewIntervalLinkedList(head, tail *IntervalNode) *IntervalLinkedList {
+	list := &IntervalLinkedList{
+		Head: head,
+		Tail: tail,
+	}
+	return list
+}
+
+func (list *IntervalLinkedList) Destroy() {
+	for t := list.Head; t != nil; t = t.Next {
+		if t.Buffer != nil {
+			bytebufferpool.Put(t.Buffer)
+		}
+	}
+}
+
 func (list *IntervalLinkedList) Offset() int64 {
 	return list.Head.Offset
 }
 func (list *IntervalLinkedList) Size() int64 {
 	return list.Tail.Offset + list.Tail.Size - list.Head.Offset
 }
+
 func (list *IntervalLinkedList) addNodeToTail(node *IntervalNode) {
 	// glog.V(4).Infof("add to tail [%d,%d) + [%d,%d) => [%d,%d)", list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, node.Offset+node.Size, list.Head.Offset, node.Offset+node.Size)
-	list.Tail.Next = node
-	list.Tail = node
+	if list.Tail.Buffer == nil {
+		list.Tail.Buffer = bytebufferpool.Get()
+		list.Tail.Buffer.Write(list.Tail.Data)
+		list.Tail.Data = nil
+	}
+	list.Tail.Buffer.Write(node.Data)
+	list.Tail.Size += int64(len(node.Data))
+	return
 }
 func (list *IntervalLinkedList) addNodeToHead(node *IntervalNode) {
 	// glog.V(4).Infof("add to head [%d,%d) + [%d,%d) => [%d,%d)", node.Offset, node.Offset+node.Size, list.Head.Offset, list.Tail.Offset+list.Tail.Size, node.Offset, list.Tail.Offset+list.Tail.Size)
@@ -47,7 +80,7 @@ func (list *IntervalLinkedList) ReadData(buf []byte, start, stop int64) {
 		nodeStart, nodeStop := max(start, t.Offset), min(stop, t.Offset+t.Size)
 		if nodeStart < nodeStop {
 			// glog.V(0).Infof("copying start=%d stop=%d t=[%d,%d) t.data=%d => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.Offset, t.Offset+t.Size, len(t.Data), len(buf), nodeStart, nodeStop)
-			copy(buf[nodeStart-start:], t.Data[nodeStart-t.Offset:nodeStop-t.Offset])
+			copy(buf[nodeStart-start:], t.Bytes()[nodeStart-t.Offset:nodeStop-t.Offset])
 		}
 
 		if t.Next == nil {
@@ -72,8 +105,15 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
 			// skip non overlapping IntervalNode
 			continue
 		}
+		data := t.Bytes()[nodeStart-t.Offset : nodeStop-t.Offset]
+		if t.Data == nil {
+			// need to clone if the bytes is from byte buffer
+			t := make([]byte, len(data))
+			copy(t, data)
+			data = t
+		}
 		nodes = append(nodes, &IntervalNode{
-			Data:   t.Data[nodeStart-t.Offset : nodeStop-t.Offset],
+			Data:   data,
 			Offset: nodeStart,
 			Size:   nodeStop - nodeStart,
 			Next:   nil,
@@ -82,10 +122,7 @@ func subList(list *IntervalLinkedList, start, stop int64) *IntervalLinkedList {
 	for i := 1; i < len(nodes); i++ {
 		nodes[i-1].Next = nodes[i]
 	}
-	return &IntervalLinkedList{
-		Head: nodes[0],
-		Tail: nodes[len(nodes)-1],
-	}
+	return NewIntervalLinkedList(nodes[0], nodes[len(nodes)-1])
 }
 
 func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
@@ -144,10 +181,7 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
 		nextList.addNodeToHead(interval)
 	}
 	if prevList == nil && nextList == nil {
-		c.lists = append(c.lists, &IntervalLinkedList{
-			Head: interval,
-			Tail: interval,
-		})
+		c.lists = append(c.lists, NewIntervalLinkedList(interval, interval))
 	}
 
 	return
@@ -155,11 +189,13 @@ func (c *ContinuousIntervals) AddInterval(data []byte, offset int64) {
 
 func (c *ContinuousIntervals) RemoveLargestIntervalLinkedList() *IntervalLinkedList {
 	var maxSize int64
-	maxIndex := -1
+	maxIndex, maxOffset := -1, int64(-1)
+	println("in memory list:", len(c.lists))
 	for k, list := range c.lists {
-		if maxSize <= list.Size() {
-			maxSize = list.Size()
-			maxIndex = k
+		listSize := list.Size()
+		if maxSize < listSize || (maxSize == listSize && list.Offset() < maxOffset ) {
+			maxSize = listSize
+			maxIndex, maxOffset = k, list.Offset()
 		}
 	}
 	if maxSize <= 0 {
@@ -202,10 +238,10 @@ func (c *ContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxSto
 func (l *IntervalLinkedList) ToReader() io.Reader {
 	var readers []io.Reader
 	t := l.Head
-	readers = append(readers, util.NewBytesReader(t.Data))
+	readers = append(readers, util.NewBytesReader(t.Bytes()))
 	for t.Next != nil {
 		t = t.Next
-		readers = append(readers, bytes.NewReader(t.Data))
+		readers = append(readers, bytes.NewReader(t.Bytes()))
 	}
 	if len(readers) == 1 {
 		return readers[0]