Browse Source

add needle reading in chunks

chrislu 2 years ago
parent
commit
663bc5dc23

BIN
weed/storage/needle/43.dat


+ 14 - 7
weed/storage/needle/needle_read.go

@@ -111,6 +111,13 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
 		}
 		n.Data = bytes[index : index+int(n.DataSize)]
 		index = index + int(n.DataSize)
+	}
+	_, err = n.readNeedleDataVersion2NonData(bytes[index:])
+	return
+}
+func (n *Needle) readNeedleDataVersion2NonData(bytes []byte) (index int, err error) {
+	lenBytes := len(bytes)
+	if index < lenBytes {
 		n.Flags = bytes[index]
 		index = index + 1
 	}
@@ -119,7 +126,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
 		index = index + 1
 		if int(n.NameSize)+index > lenBytes {
 			stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
-			return fmt.Errorf("index out of range %d", 2)
+			return index, fmt.Errorf("index out of range %d", 2)
 		}
 		n.Name = bytes[index : index+int(n.NameSize)]
 		index = index + int(n.NameSize)
@@ -129,7 +136,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
 		index = index + 1
 		if int(n.MimeSize)+index > lenBytes {
 			stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
-			return fmt.Errorf("index out of range %d", 3)
+			return index, fmt.Errorf("index out of range %d", 3)
 		}
 		n.Mime = bytes[index : index+int(n.MimeSize)]
 		index = index + int(n.MimeSize)
@@ -137,7 +144,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
 	if index < lenBytes && n.HasLastModifiedDate() {
 		if LastModifiedBytesLength+index > lenBytes {
 			stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
-			return fmt.Errorf("index out of range %d", 4)
+			return index, fmt.Errorf("index out of range %d", 4)
 		}
 		n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength])
 		index = index + LastModifiedBytesLength
@@ -145,7 +152,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
 	if index < lenBytes && n.HasTtl() {
 		if TtlBytesLength+index > lenBytes {
 			stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
-			return fmt.Errorf("index out of range %d", 5)
+			return index, fmt.Errorf("index out of range %d", 5)
 		}
 		n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength])
 		index = index + TtlBytesLength
@@ -153,19 +160,19 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
 	if index < lenBytes && n.HasPairs() {
 		if 2+index > lenBytes {
 			stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
-			return fmt.Errorf("index out of range %d", 6)
+			return index, fmt.Errorf("index out of range %d", 6)
 		}
 		n.PairsSize = util.BytesToUint16(bytes[index : index+2])
 		index += 2
 		if int(n.PairsSize)+index > lenBytes {
 			stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorIndexOutOfRange).Inc()
-			return fmt.Errorf("index out of range %d", 7)
+			return index, fmt.Errorf("index out of range %d", 7)
 		}
 		end := index + int(n.PairsSize)
 		n.Pairs = bytes[index:end]
 		index = end
 	}
-	return nil
+	return index, nil
 }
 
 func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {

+ 77 - 0
weed/storage/needle/needle_read_page.go

@@ -0,0 +1,77 @@
+package needle
+
+import (
+	"fmt"
+	"github.com/chrislusf/seaweedfs/weed/glog"
+	"github.com/chrislusf/seaweedfs/weed/storage/backend"
+	. "github.com/chrislusf/seaweedfs/weed/storage/types"
+	"github.com/chrislusf/seaweedfs/weed/util"
+	"io"
+)
+
+// ReadNeedleData uses a needle without n.Data to read the content
+// volumeOffset: the offset within the volume
+// needleOffset: the offset within the needle Data
+func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) {
+
+	sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset)
+	if sizeToRead <= 0 {
+		return 0, io.EOF
+	}
+	startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
+
+	count, err = r.ReadAt(data[:sizeToRead], startOffset)
+	if err != nil {
+		fileSize, _, _ := r.GetStat()
+		glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
+	}
+	return
+
+}
+
+// ReadNeedleMeta fills all metadata except the n.Data
+func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (checksumValue uint32, err error) {
+
+	bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
+
+	count, err := r.ReadAt(bytes, offset)
+	if count != NeedleHeaderSize+DataSizeSize || err != nil {
+		return 0, err
+	}
+	n.ParseNeedleHeader(bytes)
+	n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
+
+	startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
+	dataSize := GetActualSize(size, version)
+	stopOffset := offset + dataSize
+	metaSize := stopOffset - startOffset
+	fmt.Printf("offset %d dataSize %d\n", offset, dataSize)
+	fmt.Printf("read needle meta [%d,%d) size %d\n", startOffset, stopOffset, metaSize)
+	metaSlice := make([]byte, int(metaSize))
+
+	count, err = r.ReadAt(metaSlice, startOffset)
+	if err != nil && int64(count) == metaSize {
+		err = nil
+	}
+	if err != nil {
+		return 0, err
+	}
+
+	var index int
+	index, err = n.readNeedleDataVersion2NonData(metaSlice)
+
+	checksumValue = util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize])
+	if version == Version3 {
+		n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
+	}
+
+	return checksumValue, err
+
+}
+
+func min(x, y int64) int64 {
+	if x < y {
+		return x
+	}
+	return y
+}

+ 99 - 0
weed/storage/needle/needle_read_test.go

@@ -0,0 +1,99 @@
+package needle
+
+import (
+	"fmt"
+	"github.com/chrislusf/seaweedfs/weed/storage/backend"
+	"github.com/stretchr/testify/assert"
+	"io"
+	"os"
+	"testing"
+
+	"github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func TestPageRead(t *testing.T) {
+	baseFileName := "43"
+	offset := int64(8)
+	size := types.Size(1153890) // actual file size 1153862
+
+	datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0644)
+	if err != nil {
+		t.Fatalf("Open Volume Data File [ERROR]: %v", err)
+	}
+	datBackend := backend.NewDiskFile(datFile)
+	defer datBackend.Close()
+	{
+		n := new(Needle)
+
+		bytes, err := ReadNeedleBlob(datBackend, offset, size, Version3)
+		if err != nil {
+			t.Fatalf("readNeedleBlob: %v", err)
+		}
+		if err = n.ReadBytes(bytes, offset, size, Version3); err != nil {
+			t.Fatalf("readNeedleBlob: %v", err)
+		}
+
+		fmt.Printf("bytes len %d\n", len(bytes))
+		fmt.Printf("name %s size %d\n", n.Name, n.Size)
+
+		fmt.Printf("id %d\n", n.Id)
+		fmt.Printf("DataSize %d\n", n.DataSize)
+		fmt.Printf("Flags %v\n", n.Flags)
+		fmt.Printf("NameSize %d\n", n.NameSize)
+		fmt.Printf("MimeSize %d\n", n.MimeSize)
+		fmt.Printf("PairsSize %d\n", n.PairsSize)
+		fmt.Printf("LastModified %d\n", n.LastModified)
+		fmt.Printf("AppendAtNs %d\n", n.AppendAtNs)
+		fmt.Printf("Checksum %d\n", n.Checksum)
+	}
+
+	{
+		n, bytes, bodyLength, err := ReadNeedleHeader(datBackend, Version3, offset)
+		if err != nil {
+			t.Fatalf("ReadNeedleHeader: %v", err)
+		}
+		fmt.Printf("bytes len %d\n", len(bytes))
+		fmt.Printf("name %s size %d bodyLength:%d\n", n.Name, n.Size, bodyLength)
+	}
+
+	{
+		n := new(Needle)
+		checksumValue, err := n.ReadNeedleMeta(datBackend, offset, size, Version3)
+		if err != nil {
+			t.Fatalf("ReadNeedleHeader: %v", err)
+		}
+		fmt.Printf("name %s size %d\n", n.Name, n.Size)
+		fmt.Printf("id %d\n", n.Id)
+		fmt.Printf("DataSize %d\n", n.DataSize)
+		fmt.Printf("Flags %v\n", n.Flags)
+		fmt.Printf("NameSize %d\n", n.NameSize)
+		fmt.Printf("MimeSize %d\n", n.MimeSize)
+		fmt.Printf("PairsSize %d\n", n.PairsSize)
+		fmt.Printf("LastModified %d\n", n.LastModified)
+		fmt.Printf("AppendAtNs %d\n", n.AppendAtNs)
+		fmt.Printf("Checksum %d\n", n.Checksum)
+		fmt.Printf("Checksum value %d\n", checksumValue)
+
+		buf := make([]byte, 1024)
+		crc := CRC(0)
+		for x := int64(0); ; x += 1024 {
+			count, err := n.ReadNeedleData(datBackend, offset, buf, x)
+			if err != nil {
+				if err == io.EOF {
+					break
+				}
+				t.Fatalf("ReadNeedleData: %v", err)
+			}
+			if count > 0 {
+				crc = crc.Update(buf[0:count])
+			} else {
+				break
+			}
+		}
+		fmt.Printf("read checksum value %d\n", crc.Value())
+
+		assert.Equal(t, checksumValue, crc.Value(), "validate checksum value")
+
+	}
+
+}

+ 1 - 0
weed/storage/types/needle_types.go

@@ -33,6 +33,7 @@ type Cookie uint32
 const (
 	SizeSize           = 4 // uint32 size
 	NeedleHeaderSize   = CookieSize + NeedleIdSize + SizeSize
+	DataSizeSize       = 4
 	NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
 	TimestampSize      = 8 // int64 size
 	NeedlePaddingSize  = 8