Browse Source

file size support set file length

use Attr.FileSize and TotalChunkSize to determine file size
Chris Lu 4 years ago
parent
commit
c647deace1

+ 9 - 1
weed/filer2/entry.go

@@ -22,6 +22,7 @@ type Attr struct {
 	GroupNames    []string
 	SymlinkTarget string
 	Md5           []byte
+	FileSize      uint64
 }
 
 func (attr Attr) IsDirectory() bool {
@@ -39,7 +40,7 @@ type Entry struct {
 }
 
 func (entry *Entry) Size() uint64 {
-	return TotalSize(entry.Chunks)
+	return maxUint64(TotalSize(entry.Chunks), entry.FileSize)
 }
 
 func (entry *Entry) Timestamp() time.Time {
@@ -81,3 +82,10 @@ func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
 		Chunks:   entry.Chunks,
 	}
 }
+
+func maxUint64(x, y uint64) uint64 {
+	if x > y {
+		return x
+	}
+	return y
+}

+ 2 - 0
weed/filer2/entry_codec.go

@@ -53,6 +53,7 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
 		GroupName:     entry.Attr.GroupNames,
 		SymlinkTarget: entry.Attr.SymlinkTarget,
 		Md5:           entry.Attr.Md5,
+		FileSize:      entry.Attr.FileSize,
 	}
 }
 
@@ -73,6 +74,7 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
 	t.GroupNames = attr.GroupName
 	t.SymlinkTarget = attr.SymlinkTarget
 	t.Md5 = attr.Md5
+	t.FileSize = attr.FileSize
 
 	return t
 }

+ 4 - 0
weed/filer2/filechunks.go

@@ -20,6 +20,10 @@ func TotalSize(chunks []*filer_pb.FileChunk) (size uint64) {
 	return
 }
 
+func FileSize(entry *filer_pb.Entry) (size uint64) {
+	return maxUint64(TotalSize(entry.Chunks), entry.Attributes.FileSize)
+}
+
 func ETag(entry *filer_pb.Entry) (etag string) {
 	if entry.Attributes == nil || entry.Attributes.Md5 == nil {
 		return ETagChunks(entry.Chunks)

+ 14 - 4
weed/filesys/dirty_page.go

@@ -35,7 +35,7 @@ func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []
 	pages.lock.Lock()
 	defer pages.lock.Unlock()
 
-	glog.V(4).Infof("%s AddPage [%d,%d)", pages.f.fullpath(), offset, offset+int64(len(data)))
+	glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize)
 
 	if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) {
 		// this is more than what buffer can hold.
@@ -121,14 +121,16 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
 		return nil, false, nil
 	}
 
+	fileSize := int64(pages.f.entry.Attributes.FileSize)
 	for {
-		chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), maxList.Size())
+		chunkSize := min(maxList.Size(), fileSize-maxList.Offset())
+		chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize)
 		if err == nil {
 			hasSavedData = true
-			glog.V(4).Infof("%s saveToStorage [%d,%d) %s", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), chunk.FileId)
+			glog.V(4).Infof("%s saveToStorage %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.FileId, maxList.Offset(), maxList.Offset()+chunkSize, fileSize)
 			return
 		} else {
-			glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+maxList.Size(), err)
+			glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), maxList.Offset(), maxList.Offset()+chunkSize, err)
 			time.Sleep(5 * time.Second)
 		}
 	}
@@ -139,6 +141,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
 
 	dir, _ := pages.f.fullpath().DirAndName()
 
+	reader = io.LimitReader(reader, size)
 	chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
 	if err != nil {
 		return nil, err
@@ -149,6 +152,13 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
 
 }
 
+func maxUint64(x, y uint64) uint64 {
+	if x > y {
+		return x
+	}
+	return y
+}
+
 func max(x, y int64) int64 {
 	if x > y {
 		return x

+ 19 - 8
weed/filesys/file.go

@@ -7,12 +7,13 @@ import (
 	"sort"
 	"time"
 
+	"github.com/seaweedfs/fuse"
+	"github.com/seaweedfs/fuse/fs"
+
 	"github.com/chrislusf/seaweedfs/weed/filer2"
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
 	"github.com/chrislusf/seaweedfs/weed/util"
-	"github.com/seaweedfs/fuse"
-	"github.com/seaweedfs/fuse/fs"
 )
 
 const blockSize = 512
@@ -35,6 +36,7 @@ type File struct {
 	entryViewCache []filer2.VisibleInterval
 	isOpen         int
 	reader         io.ReaderAt
+	dirtyMetadata  bool
 }
 
 func (file *File) fullpath() util.FullPath {
@@ -43,7 +45,7 @@ func (file *File) fullpath() util.FullPath {
 
 func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
 
-	glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
+	glog.V(5).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
 
 	if file.isOpen <= 0 {
 		if err := file.maybeLoadEntry(ctx); err != nil {
@@ -54,7 +56,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
 	attr.Inode = file.fullpath().AsInode()
 	attr.Valid = time.Second
 	attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
-	attr.Size = filer2.TotalSize(file.entry.Chunks)
+	attr.Size = filer2.FileSize(file.entry)
 	if file.isOpen > 0 {
 		attr.Size = file.entry.Attributes.FileSize
 		glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
@@ -107,22 +109,31 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
 
 	if req.Valid.Size() {
 
-		glog.V(4).Infof("%v file setattr set size=%v", file.fullpath(), req.Size)
+		glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks))
 		if req.Size < filer2.TotalSize(file.entry.Chunks) {
 			// fmt.Printf("truncate %v \n", fullPath)
 			var chunks []*filer_pb.FileChunk
+			var truncatedChunks []*filer_pb.FileChunk
 			for _, chunk := range file.entry.Chunks {
 				int64Size := int64(chunk.Size)
 				if chunk.Offset+int64Size > int64(req.Size) {
+					// this chunk is truncated
 					int64Size = int64(req.Size) - chunk.Offset
-				}
-				if int64Size > 0 {
-					chunks = append(chunks, chunk)
+					if int64Size > 0 {
+						chunks = append(chunks, chunk)
+						glog.V(4).Infof("truncated chunk %+v from %d to %d\n", chunk, chunk.Size, int64Size)
+						chunk.Size = uint64(int64Size)
+					} else {
+						glog.V(4).Infof("truncated whole chunk %+v\n", chunk)
+						truncatedChunks = append(truncatedChunks, chunk)
+					}
 				}
 			}
+			file.wfs.deleteFileChunks(truncatedChunks)
 			file.entry.Chunks = chunks
 			file.entryViewCache = nil
 			file.reader = nil
+			file.dirtyMetadata = true
 		}
 		file.entry.Attributes.FileSize = req.Size
 	}

+ 11 - 12
weed/filesys/filehandle.go

@@ -19,10 +19,9 @@ import (
 
 type FileHandle struct {
 	// cache file has been written to
-	dirtyPages    *ContinuousDirtyPages
-	contentType   string
-	dirtyMetadata bool
-	handle        uint64
+	dirtyPages  *ContinuousDirtyPages
+	contentType string
+	handle      uint64
 
 	f         *File
 	RequestId fuse.RequestID // unique ID for request
@@ -40,7 +39,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle {
 		Gid:        gid,
 	}
 	if fh.f.entry != nil {
-		fh.f.entry.Attributes.FileSize = filer2.TotalSize(fh.f.entry.Chunks)
+		fh.f.entry.Attributes.FileSize = filer2.FileSize(fh.f.entry)
 	}
 	return fh
 }
@@ -55,7 +54,7 @@ var _ = fs.HandleReleaser(&FileHandle{})
 
 func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
 
-	glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
+	glog.V(2).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))
 
 	buff := make([]byte, req.Size)
 
@@ -126,7 +125,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
 	copy(data, req.Data)
 
 	fh.f.entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(fh.f.entry.Attributes.FileSize)))
-	glog.V(4).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
+	glog.V(2).Infof("%v write [%d,%d)", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)))
 
 	chunks, err := fh.dirtyPages.AddPage(req.Offset, data)
 	if err != nil {
@@ -139,14 +138,14 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
 	if req.Offset == 0 {
 		// detect mime type
 		fh.contentType = http.DetectContentType(data)
-		fh.dirtyMetadata = true
+		fh.f.dirtyMetadata = true
 	}
 
 	if len(chunks) > 0 {
 
 		fh.f.addChunks(chunks)
 
-		fh.dirtyMetadata = true
+		fh.f.dirtyMetadata = true
 	}
 
 	return nil
@@ -181,10 +180,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
 
 	if len(chunks) > 0 {
 		fh.f.addChunks(chunks)
-		fh.dirtyMetadata = true
+		fh.f.dirtyMetadata = true
 	}
 
-	if !fh.dirtyMetadata {
+	if !fh.f.dirtyMetadata {
 		return nil
 	}
 
@@ -246,7 +245,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
 	})
 
 	if err == nil {
-		fh.dirtyMetadata = false
+		fh.f.dirtyMetadata = false
 	}
 
 	if err != nil {

+ 1 - 1
weed/replication/sink/azuresink/azure_sink.go

@@ -95,7 +95,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error {
 		return nil
 	}
 
-	totalSize := filer2.TotalSize(entry.Chunks)
+	totalSize := filer2.FileSize(entry)
 	chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
 
 	// Create a URL that references a to-be-created blob in your

+ 1 - 1
weed/replication/sink/b2sink/b2_sink.go

@@ -84,7 +84,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
 		return nil
 	}
 
-	totalSize := filer2.TotalSize(entry.Chunks)
+	totalSize := filer2.FileSize(entry)
 	chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
 
 	bucket, err := g.client.Bucket(context.Background(), g.bucket)

+ 1 - 1
weed/replication/sink/gcssink/gcs_sink.go

@@ -89,7 +89,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error {
 		return nil
 	}
 
-	totalSize := filer2.TotalSize(entry.Chunks)
+	totalSize := filer2.FileSize(entry)
 	chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
 
 	wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background())

+ 1 - 1
weed/replication/sink/s3sink/s3_sink.go

@@ -107,7 +107,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error {
 		return err
 	}
 
-	totalSize := filer2.TotalSize(entry.Chunks)
+	totalSize := filer2.FileSize(entry)
 	chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize))
 
 	parts := make([]*s3.CompletedPart, len(chunkViews))

Some files were not shown because too many files changed in this diff