max_latency_writer.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package buffered_writer
  2. import (
  3. "io"
  4. "sync"
  5. "time"
  6. )
  7. type TimedWriteBuffer struct {
  8. maxLatencyWriterAt *maxLatencyWriterAt
  9. bufWriterAt *bufferedWriterAt
  10. }
  11. func (t *TimedWriteBuffer) ReadAt(p []byte, off int64) (n int, err error) {
  12. bufStart := t.bufWriterAt.nextOffset - int64(t.bufWriterAt.dataSize)
  13. start := max(bufStart, off)
  14. stop := min(t.bufWriterAt.nextOffset, off+int64(len(p)))
  15. if start <= stop {
  16. n = copy(p, t.bufWriterAt.data[start-bufStart:stop-bufStart])
  17. }
  18. return
  19. }
  20. func (t *TimedWriteBuffer) WriteAt(p []byte, offset int64) (n int, err error) {
  21. return t.maxLatencyWriterAt.WriteAt(p, offset)
  22. }
  23. func (t *TimedWriteBuffer) Flush() {
  24. t.maxLatencyWriterAt.Flush()
  25. }
  26. func (t *TimedWriteBuffer) Close() {
  27. t.maxLatencyWriterAt.Close()
  28. }
  29. func NewTimedWriteBuffer(writerAt io.WriterAt, size int, latency time.Duration, currentOffset int64) *TimedWriteBuffer {
  30. bufWriterAt := newBufferedWriterAt(writerAt, size, currentOffset)
  31. maxLatencyWriterAt := newMaxLatencyWriterAt(bufWriterAt, latency)
  32. return &TimedWriteBuffer{
  33. bufWriterAt: bufWriterAt,
  34. maxLatencyWriterAt: maxLatencyWriterAt,
  35. }
  36. }
  37. type bufferedWriterAt struct {
  38. data []byte
  39. dataSize int
  40. nextOffset int64
  41. writerAt io.WriterAt
  42. counter int
  43. }
  44. func newBufferedWriterAt(writerAt io.WriterAt, bufferSize int, currentOffset int64) *bufferedWriterAt {
  45. return &bufferedWriterAt{
  46. data: make([]byte, bufferSize),
  47. nextOffset: currentOffset,
  48. dataSize: 0,
  49. writerAt: writerAt,
  50. }
  51. }
  52. func (b *bufferedWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
  53. if b.nextOffset != offset {
  54. println("nextOffset", b.nextOffset, "bufSize", b.dataSize, "offset", offset, "data", len(p))
  55. }
  56. if b.nextOffset != offset || len(p)+b.dataSize > len(b.data) {
  57. if err = b.Flush(); err != nil {
  58. return 0, err
  59. }
  60. }
  61. if len(p)+b.dataSize > len(b.data) {
  62. n, err = b.writerAt.WriteAt(p, offset)
  63. if err == nil {
  64. b.nextOffset = offset + int64(n)
  65. }
  66. } else {
  67. n = copy(b.data[b.dataSize:len(p)+b.dataSize], p)
  68. b.dataSize += n
  69. b.nextOffset += int64(n)
  70. b.counter++
  71. }
  72. return
  73. }
  74. func (b *bufferedWriterAt) Flush() (err error) {
  75. if b.dataSize == 0 {
  76. return nil
  77. }
  78. // println("flush", b.counter)
  79. b.counter = 0
  80. _, err = b.writerAt.WriteAt(b.data[0:b.dataSize], b.nextOffset-int64(b.dataSize))
  81. if err == nil {
  82. b.dataSize = 0
  83. }
  84. return
  85. }
  86. // adapted from https://golang.org/src/net/http/httputil/reverseproxy.go
  87. type writeFlusher interface {
  88. io.WriterAt
  89. Flush() error
  90. }
  91. type maxLatencyWriterAt struct {
  92. dst writeFlusher
  93. latency time.Duration // non-zero; negative means to flush immediately
  94. mu sync.Mutex // protects t, flushPending, and dst.Flush
  95. t *time.Timer
  96. flushPending bool
  97. }
  98. func newMaxLatencyWriterAt(dst writeFlusher, latency time.Duration) *maxLatencyWriterAt {
  99. return &maxLatencyWriterAt{
  100. dst: dst,
  101. latency: latency,
  102. }
  103. }
  104. func (m *maxLatencyWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
  105. m.mu.Lock()
  106. defer m.mu.Unlock()
  107. n, err = m.dst.WriteAt(p, offset)
  108. if m.latency < 0 {
  109. m.dst.Flush()
  110. return
  111. }
  112. if m.flushPending {
  113. return
  114. }
  115. if m.t == nil {
  116. m.t = time.AfterFunc(m.latency, m.Flush)
  117. } else {
  118. m.t.Reset(m.latency)
  119. }
  120. m.flushPending = true
  121. return
  122. }
  123. func (m *maxLatencyWriterAt) Flush() {
  124. m.mu.Lock()
  125. defer m.mu.Unlock()
  126. if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
  127. return
  128. }
  129. m.dst.Flush()
  130. m.flushPending = false
  131. }
  132. func (m *maxLatencyWriterAt) Close() {
  133. m.mu.Lock()
  134. defer m.mu.Unlock()
  135. m.flushPending = false
  136. if m.t != nil {
  137. m.t.Stop()
  138. }
  139. }
  140. func min(x, y int64) int64 {
  141. if x <= y {
  142. return x
  143. }
  144. return y
  145. }
  146. func max(x, y int64) int64 {
  147. if x <= y {
  148. return y
  149. }
  150. return x
  151. }