volume_stream_write.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package storage
  2. import (
  3. "bufio"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "io"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
  14. v.dataFileAccessLock.Lock()
  15. defer v.dataFileAccessLock.Unlock()
  16. df, ok := v.DataBackend.(*backend.DiskFile)
  17. if !ok {
  18. return fmt.Errorf("unexpected volume backend")
  19. }
  20. offset, _, _ := v.DataBackend.GetStat()
  21. header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
  22. CookieToBytes(header[0:CookieSize], n.Cookie)
  23. NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
  24. n.Size = 4 + Size(dataSize) + 1
  25. SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
  26. n.DataSize = dataSize
  27. // needle header
  28. df.Write(header[0:NeedleHeaderSize])
  29. // data size and data
  30. util.Uint32toBytes(header[0:4], n.DataSize)
  31. df.Write(header[0:4])
  32. // write and calculate CRC
  33. crcWriter := needle.NewCRCwriter(df)
  34. io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
  35. // flags
  36. util.Uint8toBytes(header[0:1], n.Flags)
  37. df.Write(header[0:1])
  38. // data checksum
  39. util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
  40. // write timestamp, padding
  41. n.AppendAtNs = uint64(time.Now().UnixNano())
  42. util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
  43. padding := needle.PaddingLength(n.Size, needle.Version3)
  44. df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
  45. // add to needle map
  46. if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
  47. glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
  48. }
  49. return
  50. }
  51. func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
  52. v.dataFileAccessLock.Lock()
  53. defer v.dataFileAccessLock.Unlock()
  54. nv, ok := v.nm.Get(n.Id)
  55. if !ok || nv.Offset.IsZero() {
  56. return ErrorNotFound
  57. }
  58. sr := &StreamReader{
  59. readerAt: v.DataBackend,
  60. offset: nv.Offset.ToActualOffset(),
  61. }
  62. bufReader := bufio.NewReader(sr)
  63. bufReader.Discard(NeedleHeaderSize)
  64. sizeBuf := make([]byte, 4)
  65. bufReader.Read(sizeBuf)
  66. if _, err = writer.Write(sizeBuf); err != nil {
  67. return err
  68. }
  69. dataSize := util.BytesToUint32(sizeBuf)
  70. _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
  71. return
  72. }
  73. type StreamReader struct {
  74. offset int64
  75. readerAt io.ReaderAt
  76. }
  77. func (sr *StreamReader) Read(p []byte) (n int, err error) {
  78. n, err = sr.readerAt.ReadAt(p, sr.offset)
  79. if err != nil {
  80. return
  81. }
  82. sr.offset += int64(n)
  83. return
  84. }