dirty_pages_chunked.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package mount
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mount/page_writer"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. )
  10. type ChunkedDirtyPages struct {
  11. fh *FileHandle
  12. writeWaitGroup sync.WaitGroup
  13. lastErr error
  14. collection string
  15. replication string
  16. uploadPipeline *page_writer.UploadPipeline
  17. hasWrites bool
  18. }
  19. var (
  20. _ = page_writer.DirtyPages(&ChunkedDirtyPages{})
  21. )
  22. func newMemoryChunkPages(fh *FileHandle, chunkSize int64) *ChunkedDirtyPages {
  23. dirtyPages := &ChunkedDirtyPages{
  24. fh: fh,
  25. }
  26. swapFileDir := fh.wfs.option.getUniqueCacheDirForWrite()
  27. dirtyPages.uploadPipeline = page_writer.NewUploadPipeline(fh.wfs.concurrentWriters, chunkSize,
  28. dirtyPages.saveChunkedFileIntervalToStorage, fh.wfs.option.ConcurrentWriters, swapFileDir)
  29. return dirtyPages
  30. }
  31. func (pages *ChunkedDirtyPages) AddPage(offset int64, data []byte, isSequential bool, tsNs int64) {
  32. pages.hasWrites = true
  33. glog.V(4).Infof("%v memory AddPage [%d, %d)", pages.fh.fh, offset, offset+int64(len(data)))
  34. pages.uploadPipeline.SaveDataAt(data, offset, isSequential, tsNs)
  35. return
  36. }
  37. func (pages *ChunkedDirtyPages) FlushData() error {
  38. if !pages.hasWrites {
  39. return nil
  40. }
  41. pages.uploadPipeline.FlushAll()
  42. if pages.lastErr != nil {
  43. return fmt.Errorf("flush data: %v", pages.lastErr)
  44. }
  45. return nil
  46. }
  47. func (pages *ChunkedDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64, tsNs int64) (maxStop int64) {
  48. if !pages.hasWrites {
  49. return
  50. }
  51. return pages.uploadPipeline.MaybeReadDataAt(data, startOffset, tsNs)
  52. }
  53. func (pages *ChunkedDirtyPages) saveChunkedFileIntervalToStorage(reader io.Reader, offset int64, size int64, modifiedTsNs int64, cleanupFn func()) {
  54. defer cleanupFn()
  55. fileFullPath := pages.fh.FullPath()
  56. fileName := fileFullPath.Name()
  57. chunk, err := pages.fh.wfs.saveDataAsChunk(fileFullPath)(reader, fileName, offset, modifiedTsNs)
  58. if err != nil {
  59. glog.V(0).Infof("%v saveToStorage [%d,%d): %v", fileFullPath, offset, offset+size, err)
  60. pages.lastErr = err
  61. return
  62. }
  63. pages.fh.AddChunks([]*filer_pb.FileChunk{chunk})
  64. pages.fh.entryChunkGroup.AddChunk(chunk)
  65. glog.V(3).Infof("%v saveToStorage %s [%d,%d)", fileFullPath, chunk.FileId, offset, offset+size)
  66. }
  67. func (pages *ChunkedDirtyPages) Destroy() {
  68. pages.uploadPipeline.Shutdown()
  69. }
  70. func (pages *ChunkedDirtyPages) LockForRead(startOffset, stopOffset int64) {
  71. pages.uploadPipeline.LockForRead(startOffset, stopOffset)
  72. }
  73. func (pages *ChunkedDirtyPages) UnlockForRead(startOffset, stopOffset int64) {
  74. pages.uploadPipeline.UnlockForRead(startOffset, stopOffset)
  75. }