needle_read_page.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package needle
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  5. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "io"
  8. )
  9. // ReadNeedleData uses a needle without n.Data to read the content
  10. // volumeOffset: the offset within the volume
  11. // needleOffset: the offset within the needle Data
  12. func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) {
  13. sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset)
  14. if sizeToRead <= 0 {
  15. return 0, io.EOF
  16. }
  17. startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
  18. count, err = r.ReadAt(data[:sizeToRead], startOffset)
  19. if err != nil {
  20. fileSize, _, _ := r.GetStat()
  21. glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
  22. }
  23. return
  24. }
  25. // ReadNeedleMeta fills all metadata except the n.Data
  26. func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
  27. bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
  28. count, err := r.ReadAt(bytes, offset)
  29. if count != NeedleHeaderSize+DataSizeSize || err != nil {
  30. return err
  31. }
  32. n.ParseNeedleHeader(bytes)
  33. if n.Size != size {
  34. if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
  35. return ErrorSizeMismatch
  36. }
  37. }
  38. n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
  39. startOffset := offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
  40. dataSize := GetActualSize(size, version)
  41. stopOffset := offset + dataSize
  42. metaSize := stopOffset - startOffset
  43. metaSlice := make([]byte, int(metaSize))
  44. count, err = r.ReadAt(metaSlice, startOffset)
  45. if err != nil && int64(count) == metaSize {
  46. err = nil
  47. }
  48. if err != nil {
  49. return err
  50. }
  51. var index int
  52. index, err = n.readNeedleDataVersion2NonData(metaSlice)
  53. n.Checksum = CRC(util.BytesToUint32(metaSlice[index : index+NeedleChecksumSize]))
  54. if version == Version3 {
  55. n.AppendAtNs = util.BytesToUint64(metaSlice[index+NeedleChecksumSize : index+NeedleChecksumSize+TimestampSize])
  56. }
  57. return err
  58. }
  59. func min(x, y int64) int64 {
  60. if x < y {
  61. return x
  62. }
  63. return y
  64. }