volume_read.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package storage
  2. import (
  3. "fmt"
  4. "io"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  10. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. )
  12. // read fills in Needle content by looking up n.Id from NeedleMapper
  13. func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
  14. v.dataFileAccessLock.RLock()
  15. defer v.dataFileAccessLock.RUnlock()
  16. nv, ok := v.nm.Get(n.Id)
  17. if !ok || nv.Offset.IsZero() {
  18. return -1, ErrorNotFound
  19. }
  20. readSize := nv.Size
  21. if readSize.IsDeleted() {
  22. if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
  23. glog.V(3).Infof("reading deleted %s", n.String())
  24. readSize = -readSize
  25. } else {
  26. return -1, ErrorDeleted
  27. }
  28. }
  29. if readSize == 0 {
  30. return 0, nil
  31. }
  32. err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
  33. if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
  34. err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
  35. }
  36. v.checkReadWriteError(err)
  37. if err != nil {
  38. return 0, err
  39. }
  40. bytesRead := len(n.Data)
  41. if !n.HasTtl() {
  42. return bytesRead, nil
  43. }
  44. ttlMinutes := n.Ttl.Minutes()
  45. if ttlMinutes == 0 {
  46. return bytesRead, nil
  47. }
  48. if !n.HasLastModifiedDate() {
  49. return bytesRead, nil
  50. }
  51. if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
  52. return bytesRead, nil
  53. }
  54. return -1, ErrorNotFound
  55. }
  56. // read fills in Needle content by looking up n.Id from NeedleMapper
  57. func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
  58. v.dataFileAccessLock.RLock()
  59. defer v.dataFileAccessLock.RUnlock()
  60. return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version())
  61. }
  62. type VolumeFileScanner interface {
  63. VisitSuperBlock(super_block.SuperBlock) error
  64. ReadNeedleBody() bool
  65. VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
  66. }
  67. func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
  68. needleMapKind NeedleMapKind,
  69. volumeFileScanner VolumeFileScanner) (err error) {
  70. var v *Volume
  71. if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
  72. return fmt.Errorf("failed to load volume %d: %v", id, err)
  73. }
  74. if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
  75. return fmt.Errorf("failed to process volume %d super block: %v", id, err)
  76. }
  77. defer v.Close()
  78. version := v.Version()
  79. offset := int64(v.SuperBlock.BlockSize())
  80. return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
  81. }
  82. func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
  83. n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
  84. if e != nil {
  85. if e == io.EOF {
  86. return nil
  87. }
  88. return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
  89. }
  90. for n != nil {
  91. var needleBody []byte
  92. if volumeFileScanner.ReadNeedleBody() {
  93. // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
  94. if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
  95. glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
  96. // err = fmt.Errorf("cannot read needle body: %v", err)
  97. // return
  98. }
  99. }
  100. err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
  101. if err == io.EOF {
  102. return nil
  103. }
  104. if err != nil {
  105. glog.V(0).Infof("visit needle error: %v", err)
  106. return fmt.Errorf("visit needle error: %v", err)
  107. }
  108. offset += NeedleHeaderSize + rest
  109. glog.V(4).Infof("==> new entry offset %d", offset)
  110. if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
  111. if err == io.EOF {
  112. return nil
  113. }
  114. return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
  115. }
  116. glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
  117. }
  118. return nil
  119. }