volume_read.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  10. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. )
  12. // read fills in Needle content by looking up n.Id from NeedleMapper
  13. func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption, onReadSizeFn func(size Size)) (int, error) {
  14. v.dataFileAccessLock.RLock()
  15. defer v.dataFileAccessLock.RUnlock()
  16. nv, ok := v.nm.Get(n.Id)
  17. if !ok || nv.Offset.IsZero() {
  18. return -1, ErrorNotFound
  19. }
  20. readSize := nv.Size
  21. if readSize.IsDeleted() {
  22. if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
  23. glog.V(3).Infof("reading deleted %s", n.String())
  24. readSize = -readSize
  25. } else {
  26. return -1, ErrorDeleted
  27. }
  28. }
  29. if readSize == 0 {
  30. return 0, nil
  31. }
  32. if onReadSizeFn != nil {
  33. onReadSizeFn(readSize)
  34. }
  35. err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
  36. if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
  37. err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
  38. }
  39. v.checkReadWriteError(err)
  40. if err != nil {
  41. return 0, err
  42. }
  43. bytesRead := len(n.Data)
  44. if !n.HasTtl() {
  45. return bytesRead, nil
  46. }
  47. ttlMinutes := n.Ttl.Minutes()
  48. if ttlMinutes == 0 {
  49. return bytesRead, nil
  50. }
  51. if !n.HasLastModifiedDate() {
  52. return bytesRead, nil
  53. }
  54. if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
  55. return bytesRead, nil
  56. }
  57. return -1, ErrorNotFound
  58. }
  59. // read fills in Needle content by looking up n.Id from NeedleMapper
  60. func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
  61. v.dataFileAccessLock.RLock()
  62. defer v.dataFileAccessLock.RUnlock()
  63. return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version())
  64. }
  65. type VolumeFileScanner interface {
  66. VisitSuperBlock(super_block.SuperBlock) error
  67. ReadNeedleBody() bool
  68. VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
  69. }
  70. func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
  71. needleMapKind NeedleMapKind,
  72. volumeFileScanner VolumeFileScanner) (err error) {
  73. var v *Volume
  74. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  75. return fmt.Errorf("failed to load volume %d: %v", id, err)
  76. }
  77. if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
  78. return fmt.Errorf("failed to process volume %d super block: %v", id, err)
  79. }
  80. defer v.Close()
  81. version := v.Version()
  82. offset := int64(v.SuperBlock.BlockSize())
  83. return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
  84. }
  85. func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
  86. n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
  87. if e != nil {
  88. if e == io.EOF {
  89. return nil
  90. }
  91. return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
  92. }
  93. for n != nil {
  94. var needleBody []byte
  95. if volumeFileScanner.ReadNeedleBody() {
  96. // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
  97. if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
  98. glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
  99. // err = fmt.Errorf("cannot read needle body: %v", err)
  100. // return
  101. }
  102. }
  103. err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
  104. if err == io.EOF {
  105. return nil
  106. }
  107. if err != nil {
  108. glog.V(0).Infof("visit needle error: %v", err)
  109. return fmt.Errorf("visit needle error: %v", err)
  110. }
  111. offset += NeedleHeaderSize + rest
  112. glog.V(4).Infof("==> new entry offset %d", offset)
  113. if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
  114. if err == io.EOF {
  115. return nil
  116. }
  117. return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
  118. }
  119. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  120. }
  121. return nil
  122. }