writesched_roundrobin.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. // Copyright 2023 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "fmt"
  7. "math"
  8. )
  9. type roundRobinWriteScheduler struct {
  10. // control contains control frames (SETTINGS, PING, etc.).
  11. control writeQueue
  12. // streams maps stream ID to a queue.
  13. streams map[uint32]*writeQueue
  14. // stream queues are stored in a circular linked list.
  15. // head is the next stream to write, or nil if there are no streams open.
  16. head *writeQueue
  17. // pool of empty queues for reuse.
  18. queuePool writeQueuePool
  19. }
  20. // newRoundRobinWriteScheduler constructs a new write scheduler.
  21. // The round robin scheduler priorizes control frames
  22. // like SETTINGS and PING over DATA frames.
  23. // When there are no control frames to send, it performs a round-robin
  24. // selection from the ready streams.
  25. func newRoundRobinWriteScheduler() WriteScheduler {
  26. ws := &roundRobinWriteScheduler{
  27. streams: make(map[uint32]*writeQueue),
  28. }
  29. return ws
  30. }
  31. func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
  32. if ws.streams[streamID] != nil {
  33. panic(fmt.Errorf("stream %d already opened", streamID))
  34. }
  35. q := ws.queuePool.get()
  36. ws.streams[streamID] = q
  37. if ws.head == nil {
  38. ws.head = q
  39. q.next = q
  40. q.prev = q
  41. } else {
  42. // Queues are stored in a ring.
  43. // Insert the new stream before ws.head, putting it at the end of the list.
  44. q.prev = ws.head.prev
  45. q.next = ws.head
  46. q.prev.next = q
  47. q.next.prev = q
  48. }
  49. }
  50. func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
  51. q := ws.streams[streamID]
  52. if q == nil {
  53. return
  54. }
  55. if q.next == q {
  56. // This was the only open stream.
  57. ws.head = nil
  58. } else {
  59. q.prev.next = q.next
  60. q.next.prev = q.prev
  61. if ws.head == q {
  62. ws.head = q.next
  63. }
  64. }
  65. delete(ws.streams, streamID)
  66. ws.queuePool.put(q)
  67. }
  68. func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
  69. func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
  70. if wr.isControl() {
  71. ws.control.push(wr)
  72. return
  73. }
  74. q := ws.streams[wr.StreamID()]
  75. if q == nil {
  76. // This is a closed stream.
  77. // wr should not be a HEADERS or DATA frame.
  78. // We push the request onto the control queue.
  79. if wr.DataSize() > 0 {
  80. panic("add DATA on non-open stream")
  81. }
  82. ws.control.push(wr)
  83. return
  84. }
  85. q.push(wr)
  86. }
  87. func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
  88. // Control and RST_STREAM frames first.
  89. if !ws.control.empty() {
  90. return ws.control.shift(), true
  91. }
  92. if ws.head == nil {
  93. return FrameWriteRequest{}, false
  94. }
  95. q := ws.head
  96. for {
  97. if wr, ok := q.consume(math.MaxInt32); ok {
  98. ws.head = q.next
  99. return wr, true
  100. }
  101. q = q.next
  102. if q == ws.head {
  103. break
  104. }
  105. }
  106. return FrameWriteRequest{}, false
  107. }