sealed_buffer.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package log_buffer
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. type MemBuffer struct {
  7. buf []byte
  8. size int
  9. startTime time.Time
  10. stopTime time.Time
  11. batchIndex int64
  12. }
  13. type SealedBuffers struct {
  14. buffers []*MemBuffer
  15. }
  16. func newSealedBuffers(size int) *SealedBuffers {
  17. sbs := &SealedBuffers{}
  18. sbs.buffers = make([]*MemBuffer, size)
  19. for i := 0; i < size; i++ {
  20. sbs.buffers[i] = &MemBuffer{
  21. buf: make([]byte, BufferSize),
  22. }
  23. }
  24. return sbs
  25. }
  26. func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) {
  27. oldMemBuffer := sbs.buffers[0]
  28. size := len(sbs.buffers)
  29. for i := 0; i < size-1; i++ {
  30. sbs.buffers[i].buf = sbs.buffers[i+1].buf
  31. sbs.buffers[i].size = sbs.buffers[i+1].size
  32. sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
  33. sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
  34. sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex
  35. }
  36. sbs.buffers[size-1].buf = buf
  37. sbs.buffers[size-1].size = pos
  38. sbs.buffers[size-1].startTime = startTime
  39. sbs.buffers[size-1].stopTime = stopTime
  40. sbs.buffers[size-1].batchIndex = batchIndex
  41. return oldMemBuffer.buf
  42. }
  43. func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
  44. lastReadTs := lastReadTime.UnixNano()
  45. for pos < len(mb.buf) {
  46. size, t := readTs(mb.buf, pos)
  47. if t > lastReadTs {
  48. return
  49. }
  50. pos += size + 4
  51. }
  52. return len(mb.buf)
  53. }
  54. func (mb *MemBuffer) String() string {
  55. return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
  56. }