123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package page_writer
- import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/util/mem"
- "io"
- "os"
- "sync"
- )
- var (
- _ = PageChunk(&SwapFileChunk{})
- )
- type ActualChunkIndex int
- type SwapFile struct {
- dir string
- file *os.File
- chunkSize int64
- chunkTrackingLock sync.Mutex
- activeChunkCount int
- freeActualChunkList []ActualChunkIndex
- }
- type SwapFileChunk struct {
- sync.RWMutex
- swapfile *SwapFile
- usage *ChunkWrittenIntervalList
- logicChunkIndex LogicChunkIndex
- actualChunkIndex ActualChunkIndex
- activityScore *ActivityScore
- //memChunk *MemChunk
- }
- func NewSwapFile(dir string, chunkSize int64) *SwapFile {
- return &SwapFile{
- dir: dir,
- file: nil,
- chunkSize: chunkSize,
- }
- }
- func (sf *SwapFile) FreeResource() {
- if sf.file != nil {
- sf.file.Close()
- os.Remove(sf.file.Name())
- }
- }
- func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
- if sf.file == nil {
- var err error
- sf.file, err = os.CreateTemp(sf.dir, "")
- if err != nil {
- glog.Errorf("create swap file: %v", err)
- return nil
- }
- }
- sf.chunkTrackingLock.Lock()
- defer sf.chunkTrackingLock.Unlock()
- sf.activeChunkCount++
- // assign a new physical chunk
- var actualChunkIndex ActualChunkIndex
- if len(sf.freeActualChunkList) > 0 {
- actualChunkIndex = sf.freeActualChunkList[0]
- sf.freeActualChunkList = sf.freeActualChunkList[1:]
- } else {
- actualChunkIndex = ActualChunkIndex(sf.activeChunkCount)
- }
- swapFileChunk := &SwapFileChunk{
- swapfile: sf,
- usage: newChunkWrittenIntervalList(),
- logicChunkIndex: logicChunkIndex,
- actualChunkIndex: actualChunkIndex,
- activityScore: NewActivityScore(),
- // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize),
- }
- // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf)
- return swapFileChunk
- }
- func (sc *SwapFileChunk) FreeResource() {
- sc.Lock()
- defer sc.Unlock()
- sc.swapfile.chunkTrackingLock.Lock()
- defer sc.swapfile.chunkTrackingLock.Unlock()
- sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
- sc.swapfile.activeChunkCount--
- // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile)
- }
- func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
- sc.Lock()
- defer sc.Unlock()
- // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex)
- innerOffset := offset % sc.swapfile.chunkSize
- var err error
- n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
- sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
- if err != nil {
- glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
- }
- //sc.memChunk.WriteDataAt(src, offset, tsNs)
- sc.activityScore.MarkWrite()
- return
- }
- func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
- sc.RLock()
- defer sc.RUnlock()
- // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex)
- //memCopy := make([]byte, len(p))
- //copy(memCopy, p)
- chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
- for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
- logicStart := max(off, chunkStartOffset+t.StartOffset)
- logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
- if logicStart < logicStop {
- actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
- if n, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
- if err == io.EOF && n == int(logicStop-logicStart) {
- err = nil
- }
- glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
- break
- }
- maxStop = max(maxStop, logicStop)
- if t.TsNs >= tsNs {
- println("read new data2", t.TsNs-tsNs, "ns")
- }
- }
- }
- //sc.memChunk.ReadDataAt(memCopy, off, tsNs)
- //if bytes.Compare(memCopy, p) != 0 {
- // println("read wrong data from swap file", off, sc.logicChunkIndex)
- //}
- sc.activityScore.MarkRead()
- return
- }
- func (sc *SwapFileChunk) IsComplete() bool {
- sc.RLock()
- defer sc.RUnlock()
- return sc.usage.IsComplete(sc.swapfile.chunkSize)
- }
- func (sc *SwapFileChunk) ActivityScore() int64 {
- return sc.activityScore.ActivityScore()
- }
- func (sc *SwapFileChunk) WrittenSize() int64 {
- sc.RLock()
- defer sc.RUnlock()
- return sc.usage.WrittenSize()
- }
- func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
- sc.RLock()
- defer sc.RUnlock()
- if saveFn == nil {
- return
- }
- // println(sc.logicChunkIndex, "|", "save")
- for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
- startOffset := t.StartOffset
- stopOffset := t.stopOffset
- tsNs := t.TsNs
- for t != sc.usage.tail && t.next.StartOffset == stopOffset {
- stopOffset = t.next.stopOffset
- t = t.next
- tsNs = max(tsNs, t.TsNs)
- }
- data := mem.Allocate(int(stopOffset - startOffset))
- n, _ := sc.swapfile.file.ReadAt(data, startOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
- if n > 0 {
- reader := util.NewBytesReader(data[:n])
- saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+startOffset, int64(n), tsNs, func() {
- })
- }
- mem.Free(data)
- }
- }
|