Browse Source

chunk cache adds function ReadChunkAt

chrislu 3 years ago
parent
commit
3ad5fa6f6f

+ 49 - 0
weed/util/chunk_cache/chunk_cache.go

@@ -13,6 +13,7 @@ var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
 type ChunkCache interface {
 	GetChunk(fileId string, minSize uint64) (data []byte)
 	GetChunkSlice(fileId string, offset, length uint64) []byte
+	ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
 	SetChunk(fileId string, data []byte)
 }
 
@@ -145,6 +146,54 @@ func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64)
 	return nil
 }
 
+func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
+	if c == nil {
+		return 0, nil
+	}
+
+	c.RLock()
+	defer c.RUnlock()
+
+	minSize := offset + uint64(len(data))
+	if minSize <= c.onDiskCacheSizeLimit0 {
+		n, err = c.memCache.readChunkAt(data, fileId, offset)
+		if err != nil {
+			glog.Errorf("failed to read from memcache: %s", err)
+		}
+		if n >= int(minSize) {
+			return n, nil
+		}
+	}
+
+	fid, err := needle.ParseFileIdFromString(fileId)
+	if err != nil {
+		glog.Errorf("failed to parse file id %s", fileId)
+		return n, nil
+	}
+
+	if minSize <= c.onDiskCacheSizeLimit0 {
+		n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
+		if n >= int(minSize) {
+			return
+		}
+	}
+	if minSize <= c.onDiskCacheSizeLimit1 {
+		n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
+		if n >= int(minSize) {
+			return
+		}
+	}
+	{
+		n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
+		if n >= int(minSize) {
+			return
+		}
+	}
+
+	return 0, nil
+
+}
+
 func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
 	if c == nil {
 		return

+ 16 - 2
weed/util/chunk_cache/chunk_cache_in_memory.go

@@ -1,9 +1,8 @@
 package chunk_cache
 
 import (
-	"time"
-
 	"github.com/karlseguin/ccache/v2"
+	"time"
 )
 
 // a global cache for recently accessed file chunks
@@ -45,6 +44,21 @@ func (c *ChunkCacheInMemory) getChunkSlice(fileId string, offset, length uint64)
 	return data[offset : int(offset)+wanted], nil
 }
 
+func (c *ChunkCacheInMemory) readChunkAt(buffer []byte, fileId string, offset uint64) (int, error) {
+	item := c.cache.Get(fileId)
+	if item == nil {
+		return 0, nil
+	}
+	data := item.Value().([]byte)
+	item.Extend(time.Hour)
+	wanted := min(len(buffer), len(data)-int(offset))
+	if wanted < 0 {
+		return 0, ErrorOutOfBounds
+	}
+	n := copy(buffer, data[offset:int(offset)+wanted])
+	return n, nil
+}
+
 func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
 	localCopy := make([]byte, len(data))
 	copy(localCopy, data)

+ 22 - 0
weed/util/chunk_cache/chunk_cache_on_disk.go

@@ -144,6 +144,28 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin
 	return data, nil
 }
 
+func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, offset uint64) (n int, err error) {
+	nv, ok := v.nm.Get(key)
+	if !ok {
+		return 0, storage.ErrorNotFound
+	}
+	wanted := min(len(data), int(nv.Size)-int(offset))
+	if wanted < 0 {
+		// should never happen, but better than panicing
+		return 0, ErrorOutOfBounds
+	}
+	if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil {
+		return n, fmt.Errorf("read %s.dat [%d,%d): %v",
+			v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
+	} else {
+		if n != wanted {
+			return n, fmt.Errorf("read %d, expected %d", n, wanted)
+		}
+	}
+
+	return n, nil
+}
+
 func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
 
 	offset := v.fileSize

+ 18 - 7
weed/util/chunk_cache/chunk_cache_on_disk_test.go

@@ -3,6 +3,7 @@ package chunk_cache
 import (
 	"bytes"
 	"fmt"
+	"github.com/chrislusf/seaweedfs/weed/util/mem"
 	"math/rand"
 	"testing"
 )
@@ -18,7 +19,7 @@ func TestOnDisk(t *testing.T) {
 	type test_data struct {
 		data   []byte
 		fileId string
-		size   uint64
+		size   int
 	}
 	testData := make([]*test_data, writeCount)
 	for i := 0; i < writeCount; i++ {
@@ -27,29 +28,35 @@ func TestOnDisk(t *testing.T) {
 		testData[i] = &test_data{
 			data:   buff,
 			fileId: fmt.Sprintf("1,%daabbccdd", i+1),
-			size:   uint64(len(buff)),
+			size:   len(buff),
 		}
 		cache.SetChunk(testData[i].fileId, testData[i].data)
 
 		// read back right after write
-		data := cache.GetChunk(testData[i].fileId, testData[i].size)
+		data := mem.Allocate(testData[i].size)
+		cache.ReadChunkAt(data, testData[i].fileId, 0)
 		if bytes.Compare(data, testData[i].data) != 0 {
 			t.Errorf("failed to write to and read from cache: %d", i)
 		}
+		mem.Free(data)
 	}
 
 	for i := 0; i < 2; i++ {
-		data := cache.GetChunk(testData[i].fileId, testData[i].size)
+		data := mem.Allocate(testData[i].size)
+		cache.ReadChunkAt(data, testData[i].fileId, 0)
 		if bytes.Compare(data, testData[i].data) == 0 {
 			t.Errorf("old cache should have been purged: %d", i)
 		}
+		mem.Free(data)
 	}
 
 	for i := 2; i < writeCount; i++ {
-		data := cache.GetChunk(testData[i].fileId, testData[i].size)
+		data := mem.Allocate(testData[i].size)
+		cache.ReadChunkAt(data, testData[i].fileId, 0)
 		if bytes.Compare(data, testData[i].data) != 0 {
 			t.Errorf("failed to write to and read from cache: %d", i)
 		}
+		mem.Free(data)
 	}
 
 	cache.Shutdown()
@@ -57,10 +64,12 @@ func TestOnDisk(t *testing.T) {
 	cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024)
 
 	for i := 0; i < 2; i++ {
-		data := cache.GetChunk(testData[i].fileId, testData[i].size)
+		data := mem.Allocate(testData[i].size)
+		cache.ReadChunkAt(data, testData[i].fileId, 0)
 		if bytes.Compare(data, testData[i].data) == 0 {
 			t.Errorf("old cache should have been purged: %d", i)
 		}
+		mem.Free(data)
 	}
 
 	for i := 2; i < writeCount; i++ {
@@ -83,10 +92,12 @@ func TestOnDisk(t *testing.T) {
 			*/
 			continue
 		}
-		data := cache.GetChunk(testData[i].fileId, testData[i].size)
+		data := mem.Allocate(testData[i].size)
+		cache.ReadChunkAt(data, testData[i].fileId, 0)
 		if bytes.Compare(data, testData[i].data) != 0 {
 			t.Errorf("failed to write to and read from cache: %d", i)
 		}
+		mem.Free(data)
 	}
 
 	cache.Shutdown()

+ 20 - 0
weed/util/chunk_cache/on_disk_cache_layer.go

@@ -108,6 +108,26 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length
 
 }
 
+func (c *OnDiskCacheLayer) readChunkAt(buffer []byte, needleId types.NeedleId, offset uint64) (n int, err error) {
+
+	for _, diskCache := range c.diskCaches {
+		n, err = diskCache.readNeedleSliceAt(buffer, needleId, offset)
+		if err == storage.ErrorNotFound {
+			continue
+		}
+		if err != nil {
+			glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err)
+			continue
+		}
+		if n > 0 {
+			return
+		}
+	}
+
+	return
+
+}
+
 func (c *OnDiskCacheLayer) shutdown() {
 
 	for _, diskCache := range c.diskCaches {