databuffer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "errors"
  7. "fmt"
  8. "sync"
  9. )
  10. // Buffer chunks are allocated from a pool to reduce pressure on GC.
  11. // The maximum wasted space per dataBuffer is 2x the largest size class,
  12. // which happens when the dataBuffer has multiple chunks and there is
  13. // one unread byte in both the first and last chunks. We use a few size
  14. // classes to minimize overheads for servers that typically receive very
  15. // small request bodies.
  16. //
  17. // TODO: Benchmark to determine if the pools are necessary. The GC may have
  18. // improved enough that we can instead allocate chunks like this:
  19. // make([]byte, max(16<<10, expectedBytesRemaining))
  20. var dataChunkPools = [...]sync.Pool{
  21. {New: func() interface{} { return new([1 << 10]byte) }},
  22. {New: func() interface{} { return new([2 << 10]byte) }},
  23. {New: func() interface{} { return new([4 << 10]byte) }},
  24. {New: func() interface{} { return new([8 << 10]byte) }},
  25. {New: func() interface{} { return new([16 << 10]byte) }},
  26. }
  27. func getDataBufferChunk(size int64) []byte {
  28. switch {
  29. case size <= 1<<10:
  30. return dataChunkPools[0].Get().(*[1 << 10]byte)[:]
  31. case size <= 2<<10:
  32. return dataChunkPools[1].Get().(*[2 << 10]byte)[:]
  33. case size <= 4<<10:
  34. return dataChunkPools[2].Get().(*[4 << 10]byte)[:]
  35. case size <= 8<<10:
  36. return dataChunkPools[3].Get().(*[8 << 10]byte)[:]
  37. default:
  38. return dataChunkPools[4].Get().(*[16 << 10]byte)[:]
  39. }
  40. }
  41. func putDataBufferChunk(p []byte) {
  42. switch len(p) {
  43. case 1 << 10:
  44. dataChunkPools[0].Put((*[1 << 10]byte)(p))
  45. case 2 << 10:
  46. dataChunkPools[1].Put((*[2 << 10]byte)(p))
  47. case 4 << 10:
  48. dataChunkPools[2].Put((*[4 << 10]byte)(p))
  49. case 8 << 10:
  50. dataChunkPools[3].Put((*[8 << 10]byte)(p))
  51. case 16 << 10:
  52. dataChunkPools[4].Put((*[16 << 10]byte)(p))
  53. default:
  54. panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))
  55. }
  56. }
  57. // dataBuffer is an io.ReadWriter backed by a list of data chunks.
  58. // Each dataBuffer is used to read DATA frames on a single stream.
  59. // The buffer is divided into chunks so the server can limit the
  60. // total memory used by a single connection without limiting the
  61. // request body size on any single stream.
  62. type dataBuffer struct {
  63. chunks [][]byte
  64. r int // next byte to read is chunks[0][r]
  65. w int // next byte to write is chunks[len(chunks)-1][w]
  66. size int // total buffered bytes
  67. expected int64 // we expect at least this many bytes in future Write calls (ignored if <= 0)
  68. }
  69. var errReadEmpty = errors.New("read from empty dataBuffer")
  70. // Read copies bytes from the buffer into p.
  71. // It is an error to read when no data is available.
  72. func (b *dataBuffer) Read(p []byte) (int, error) {
  73. if b.size == 0 {
  74. return 0, errReadEmpty
  75. }
  76. var ntotal int
  77. for len(p) > 0 && b.size > 0 {
  78. readFrom := b.bytesFromFirstChunk()
  79. n := copy(p, readFrom)
  80. p = p[n:]
  81. ntotal += n
  82. b.r += n
  83. b.size -= n
  84. // If the first chunk has been consumed, advance to the next chunk.
  85. if b.r == len(b.chunks[0]) {
  86. putDataBufferChunk(b.chunks[0])
  87. end := len(b.chunks) - 1
  88. copy(b.chunks[:end], b.chunks[1:])
  89. b.chunks[end] = nil
  90. b.chunks = b.chunks[:end]
  91. b.r = 0
  92. }
  93. }
  94. return ntotal, nil
  95. }
  96. func (b *dataBuffer) bytesFromFirstChunk() []byte {
  97. if len(b.chunks) == 1 {
  98. return b.chunks[0][b.r:b.w]
  99. }
  100. return b.chunks[0][b.r:]
  101. }
  102. // Len returns the number of bytes of the unread portion of the buffer.
  103. func (b *dataBuffer) Len() int {
  104. return b.size
  105. }
  106. // Write appends p to the buffer.
  107. func (b *dataBuffer) Write(p []byte) (int, error) {
  108. ntotal := len(p)
  109. for len(p) > 0 {
  110. // If the last chunk is empty, allocate a new chunk. Try to allocate
  111. // enough to fully copy p plus any additional bytes we expect to
  112. // receive. However, this may allocate less than len(p).
  113. want := int64(len(p))
  114. if b.expected > want {
  115. want = b.expected
  116. }
  117. chunk := b.lastChunkOrAlloc(want)
  118. n := copy(chunk[b.w:], p)
  119. p = p[n:]
  120. b.w += n
  121. b.size += n
  122. b.expected -= int64(n)
  123. }
  124. return ntotal, nil
  125. }
  126. func (b *dataBuffer) lastChunkOrAlloc(want int64) []byte {
  127. if len(b.chunks) != 0 {
  128. last := b.chunks[len(b.chunks)-1]
  129. if b.w < len(last) {
  130. return last
  131. }
  132. }
  133. chunk := getDataBufferChunk(want)
  134. b.chunks = append(b.chunks, chunk)
  135. b.w = 0
  136. return chunk
  137. }