sealed_buffer.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package log_buffer
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. type MemBuffer struct {
  7. buf []byte
  8. size int
  9. startTime time.Time
  10. stopTime time.Time
  11. }
  12. type SealedBuffers struct {
  13. buffers []*MemBuffer
  14. }
  15. func newSealedBuffers(size int) *SealedBuffers {
  16. sbs := &SealedBuffers{}
  17. sbs.buffers = make([]*MemBuffer, size)
  18. for i := 0; i < size; i++ {
  19. sbs.buffers[i] = &MemBuffer{
  20. buf: make([]byte, BufferSize),
  21. }
  22. }
  23. return sbs
  24. }
  25. func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) {
  26. oldMemBuffer := sbs.buffers[0]
  27. size := len(sbs.buffers)
  28. for i := 0; i < size-1; i++ {
  29. sbs.buffers[i].buf = sbs.buffers[i+1].buf
  30. sbs.buffers[i].size = sbs.buffers[i+1].size
  31. sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
  32. sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
  33. }
  34. sbs.buffers[size-1].buf = buf
  35. sbs.buffers[size-1].size = pos
  36. sbs.buffers[size-1].startTime = startTime
  37. sbs.buffers[size-1].stopTime = stopTime
  38. return oldMemBuffer.buf
  39. }
  40. func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
  41. lastReadTs := lastReadTime.UnixNano()
  42. for pos < len(mb.buf) {
  43. size, t := readTs(mb.buf, pos)
  44. if t > lastReadTs {
  45. return
  46. }
  47. pos += size + 4
  48. }
  49. return len(mb.buf)
  50. }
  51. func (mb *MemBuffer) String() string {
  52. return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
  53. }