client_conn_pool.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code's client connection pooling.
  5. package http2
  6. import (
  7. "context"
  8. "crypto/tls"
  9. "errors"
  10. "net/http"
  11. "sync"
  12. )
  13. // ClientConnPool manages a pool of HTTP/2 client connections.
  14. type ClientConnPool interface {
  15. // GetClientConn returns a specific HTTP/2 connection (usually
  16. // a TLS-TCP connection) to an HTTP/2 server. On success, the
  17. // returned ClientConn accounts for the upcoming RoundTrip
  18. // call, so the caller should not omit it. If the caller needs
  19. // to, ClientConn.RoundTrip can be called with a bogus
  20. // new(http.Request) to release the stream reservation.
  21. GetClientConn(req *http.Request, addr string) (*ClientConn, error)
  22. MarkDead(*ClientConn)
  23. }
  24. // clientConnPoolIdleCloser is the interface implemented by ClientConnPool
  25. // implementations which can close their idle connections.
  26. type clientConnPoolIdleCloser interface {
  27. ClientConnPool
  28. closeIdleConnections()
  29. }
  30. var (
  31. _ clientConnPoolIdleCloser = (*clientConnPool)(nil)
  32. _ clientConnPoolIdleCloser = noDialClientConnPool{}
  33. )
  34. // TODO: use singleflight for dialing and addConnCalls?
  35. type clientConnPool struct {
  36. t *Transport
  37. mu sync.Mutex // TODO: maybe switch to RWMutex
  38. // TODO: add support for sharing conns based on cert names
  39. // (e.g. share conn for googleapis.com and appspot.com)
  40. conns map[string][]*ClientConn // key is host:port
  41. dialing map[string]*dialCall // currently in-flight dials
  42. keys map[*ClientConn][]string
  43. addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
  44. }
  45. func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  46. return p.getClientConn(req, addr, dialOnMiss)
  47. }
  48. const (
  49. dialOnMiss = true
  50. noDialOnMiss = false
  51. )
  52. func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
  53. // TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
  54. if isConnectionCloseRequest(req) && dialOnMiss {
  55. // It gets its own connection.
  56. traceGetConn(req, addr)
  57. const singleUse = true
  58. cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
  59. if err != nil {
  60. return nil, err
  61. }
  62. return cc, nil
  63. }
  64. for {
  65. p.mu.Lock()
  66. for _, cc := range p.conns[addr] {
  67. if cc.ReserveNewRequest() {
  68. // When a connection is presented to us by the net/http package,
  69. // the GetConn hook has already been called.
  70. // Don't call it a second time here.
  71. if !cc.getConnCalled {
  72. traceGetConn(req, addr)
  73. }
  74. cc.getConnCalled = false
  75. p.mu.Unlock()
  76. return cc, nil
  77. }
  78. }
  79. if !dialOnMiss {
  80. p.mu.Unlock()
  81. return nil, ErrNoCachedConn
  82. }
  83. traceGetConn(req, addr)
  84. call := p.getStartDialLocked(req.Context(), addr)
  85. p.mu.Unlock()
  86. <-call.done
  87. if shouldRetryDial(call, req) {
  88. continue
  89. }
  90. cc, err := call.res, call.err
  91. if err != nil {
  92. return nil, err
  93. }
  94. if cc.ReserveNewRequest() {
  95. return cc, nil
  96. }
  97. }
  98. }
  99. // dialCall is an in-flight Transport dial call to a host.
  100. type dialCall struct {
  101. _ incomparable
  102. p *clientConnPool
  103. // the context associated with the request
  104. // that created this dialCall
  105. ctx context.Context
  106. done chan struct{} // closed when done
  107. res *ClientConn // valid after done is closed
  108. err error // valid after done is closed
  109. }
  110. // requires p.mu is held.
  111. func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
  112. if call, ok := p.dialing[addr]; ok {
  113. // A dial is already in-flight. Don't start another.
  114. return call
  115. }
  116. call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
  117. if p.dialing == nil {
  118. p.dialing = make(map[string]*dialCall)
  119. }
  120. p.dialing[addr] = call
  121. go call.dial(call.ctx, addr)
  122. return call
  123. }
  124. // run in its own goroutine.
  125. func (c *dialCall) dial(ctx context.Context, addr string) {
  126. const singleUse = false // shared conn
  127. c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
  128. c.p.mu.Lock()
  129. delete(c.p.dialing, addr)
  130. if c.err == nil {
  131. c.p.addConnLocked(addr, c.res)
  132. }
  133. c.p.mu.Unlock()
  134. close(c.done)
  135. }
  136. // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
  137. // already exist. It coalesces concurrent calls with the same key.
  138. // This is used by the http1 Transport code when it creates a new connection. Because
  139. // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
  140. // the protocol), it can get into a situation where it has multiple TLS connections.
  141. // This code decides which ones live or die.
  142. // The return value used is whether c was used.
  143. // c is never closed.
  144. func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
  145. p.mu.Lock()
  146. for _, cc := range p.conns[key] {
  147. if cc.CanTakeNewRequest() {
  148. p.mu.Unlock()
  149. return false, nil
  150. }
  151. }
  152. call, dup := p.addConnCalls[key]
  153. if !dup {
  154. if p.addConnCalls == nil {
  155. p.addConnCalls = make(map[string]*addConnCall)
  156. }
  157. call = &addConnCall{
  158. p: p,
  159. done: make(chan struct{}),
  160. }
  161. p.addConnCalls[key] = call
  162. go call.run(t, key, c)
  163. }
  164. p.mu.Unlock()
  165. <-call.done
  166. if call.err != nil {
  167. return false, call.err
  168. }
  169. return !dup, nil
  170. }
  171. type addConnCall struct {
  172. _ incomparable
  173. p *clientConnPool
  174. done chan struct{} // closed when done
  175. err error
  176. }
  177. func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
  178. cc, err := t.NewClientConn(tc)
  179. p := c.p
  180. p.mu.Lock()
  181. if err != nil {
  182. c.err = err
  183. } else {
  184. cc.getConnCalled = true // already called by the net/http package
  185. p.addConnLocked(key, cc)
  186. }
  187. delete(p.addConnCalls, key)
  188. p.mu.Unlock()
  189. close(c.done)
  190. }
  191. // p.mu must be held
  192. func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
  193. for _, v := range p.conns[key] {
  194. if v == cc {
  195. return
  196. }
  197. }
  198. if p.conns == nil {
  199. p.conns = make(map[string][]*ClientConn)
  200. }
  201. if p.keys == nil {
  202. p.keys = make(map[*ClientConn][]string)
  203. }
  204. p.conns[key] = append(p.conns[key], cc)
  205. p.keys[cc] = append(p.keys[cc], key)
  206. }
  207. func (p *clientConnPool) MarkDead(cc *ClientConn) {
  208. p.mu.Lock()
  209. defer p.mu.Unlock()
  210. for _, key := range p.keys[cc] {
  211. vv, ok := p.conns[key]
  212. if !ok {
  213. continue
  214. }
  215. newList := filterOutClientConn(vv, cc)
  216. if len(newList) > 0 {
  217. p.conns[key] = newList
  218. } else {
  219. delete(p.conns, key)
  220. }
  221. }
  222. delete(p.keys, cc)
  223. }
  224. func (p *clientConnPool) closeIdleConnections() {
  225. p.mu.Lock()
  226. defer p.mu.Unlock()
  227. // TODO: don't close a cc if it was just added to the pool
  228. // milliseconds ago and has never been used. There's currently
  229. // a small race window with the HTTP/1 Transport's integration
  230. // where it can add an idle conn just before using it, and
  231. // somebody else can concurrently call CloseIdleConns and
  232. // break some caller's RoundTrip.
  233. for _, vv := range p.conns {
  234. for _, cc := range vv {
  235. cc.closeIfIdle()
  236. }
  237. }
  238. }
  239. func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
  240. out := in[:0]
  241. for _, v := range in {
  242. if v != exclude {
  243. out = append(out, v)
  244. }
  245. }
  246. // If we filtered it out, zero out the last item to prevent
  247. // the GC from seeing it.
  248. if len(in) != len(out) {
  249. in[len(in)-1] = nil
  250. }
  251. return out
  252. }
  253. // noDialClientConnPool is an implementation of http2.ClientConnPool
  254. // which never dials. We let the HTTP/1.1 client dial and use its TLS
  255. // connection instead.
  256. type noDialClientConnPool struct{ *clientConnPool }
  257. func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
  258. return p.getClientConn(req, addr, noDialOnMiss)
  259. }
  260. // shouldRetryDial reports whether the current request should
  261. // retry dialing after the call finished unsuccessfully, for example
  262. // if the dial was canceled because of a context cancellation or
  263. // deadline expiry.
  264. func shouldRetryDial(call *dialCall, req *http.Request) bool {
  265. if call.err == nil {
  266. // No error, no need to retry
  267. return false
  268. }
  269. if call.ctx == req.Context() {
  270. // If the call has the same context as the request, the dial
  271. // should not be retried, since any cancellation will have come
  272. // from this request.
  273. return false
  274. }
  275. if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
  276. // If the call error is not because of a context cancellation or a deadline expiry,
  277. // the dial should not be retried.
  278. return false
  279. }
  280. // Only retry if the error is a context cancellation error or deadline expiry
  281. // and the context associated with the call was canceled or expired.
  282. return call.ctx.Err() != nil
  283. }