123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- // Copyright 2015 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Transport code's client connection pooling.
- package http2
- import (
- "context"
- "crypto/tls"
- "errors"
- "net/http"
- "sync"
- )
- // ClientConnPool manages a pool of HTTP/2 client connections.
- type ClientConnPool interface {
- // GetClientConn returns a specific HTTP/2 connection (usually
- // a TLS-TCP connection) to an HTTP/2 server. On success, the
- // returned ClientConn accounts for the upcoming RoundTrip
- // call, so the caller should not omit it. If the caller needs
- // to, ClientConn.RoundTrip can be called with a bogus
- // new(http.Request) to release the stream reservation.
- GetClientConn(req *http.Request, addr string) (*ClientConn, error)
- MarkDead(*ClientConn)
- }
- // clientConnPoolIdleCloser is the interface implemented by ClientConnPool
- // implementations which can close their idle connections.
- type clientConnPoolIdleCloser interface {
- ClientConnPool
- closeIdleConnections()
- }
- var (
- _ clientConnPoolIdleCloser = (*clientConnPool)(nil)
- _ clientConnPoolIdleCloser = noDialClientConnPool{}
- )
- // TODO: use singleflight for dialing and addConnCalls?
- type clientConnPool struct {
- t *Transport
- mu sync.Mutex // TODO: maybe switch to RWMutex
- // TODO: add support for sharing conns based on cert names
- // (e.g. share conn for googleapis.com and appspot.com)
- conns map[string][]*ClientConn // key is host:port
- dialing map[string]*dialCall // currently in-flight dials
- keys map[*ClientConn][]string
- addConnCalls map[string]*addConnCall // in-flight addConnIfNeeded calls
- }
- func (p *clientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
- return p.getClientConn(req, addr, dialOnMiss)
- }
- const (
- dialOnMiss = true
- noDialOnMiss = false
- )
- func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMiss bool) (*ClientConn, error) {
- // TODO(dneil): Dial a new connection when t.DisableKeepAlives is set?
- if isConnectionCloseRequest(req) && dialOnMiss {
- // It gets its own connection.
- traceGetConn(req, addr)
- const singleUse = true
- cc, err := p.t.dialClientConn(req.Context(), addr, singleUse)
- if err != nil {
- return nil, err
- }
- return cc, nil
- }
- for {
- p.mu.Lock()
- for _, cc := range p.conns[addr] {
- if cc.ReserveNewRequest() {
- // When a connection is presented to us by the net/http package,
- // the GetConn hook has already been called.
- // Don't call it a second time here.
- if !cc.getConnCalled {
- traceGetConn(req, addr)
- }
- cc.getConnCalled = false
- p.mu.Unlock()
- return cc, nil
- }
- }
- if !dialOnMiss {
- p.mu.Unlock()
- return nil, ErrNoCachedConn
- }
- traceGetConn(req, addr)
- call := p.getStartDialLocked(req.Context(), addr)
- p.mu.Unlock()
- <-call.done
- if shouldRetryDial(call, req) {
- continue
- }
- cc, err := call.res, call.err
- if err != nil {
- return nil, err
- }
- if cc.ReserveNewRequest() {
- return cc, nil
- }
- }
- }
- // dialCall is an in-flight Transport dial call to a host.
- type dialCall struct {
- _ incomparable
- p *clientConnPool
- // the context associated with the request
- // that created this dialCall
- ctx context.Context
- done chan struct{} // closed when done
- res *ClientConn // valid after done is closed
- err error // valid after done is closed
- }
- // requires p.mu is held.
- func (p *clientConnPool) getStartDialLocked(ctx context.Context, addr string) *dialCall {
- if call, ok := p.dialing[addr]; ok {
- // A dial is already in-flight. Don't start another.
- return call
- }
- call := &dialCall{p: p, done: make(chan struct{}), ctx: ctx}
- if p.dialing == nil {
- p.dialing = make(map[string]*dialCall)
- }
- p.dialing[addr] = call
- go call.dial(call.ctx, addr)
- return call
- }
- // run in its own goroutine.
- func (c *dialCall) dial(ctx context.Context, addr string) {
- const singleUse = false // shared conn
- c.res, c.err = c.p.t.dialClientConn(ctx, addr, singleUse)
- c.p.mu.Lock()
- delete(c.p.dialing, addr)
- if c.err == nil {
- c.p.addConnLocked(addr, c.res)
- }
- c.p.mu.Unlock()
- close(c.done)
- }
- // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't
- // already exist. It coalesces concurrent calls with the same key.
- // This is used by the http1 Transport code when it creates a new connection. Because
- // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know
- // the protocol), it can get into a situation where it has multiple TLS connections.
- // This code decides which ones live or die.
- // The return value used is whether c was used.
- // c is never closed.
- func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn) (used bool, err error) {
- p.mu.Lock()
- for _, cc := range p.conns[key] {
- if cc.CanTakeNewRequest() {
- p.mu.Unlock()
- return false, nil
- }
- }
- call, dup := p.addConnCalls[key]
- if !dup {
- if p.addConnCalls == nil {
- p.addConnCalls = make(map[string]*addConnCall)
- }
- call = &addConnCall{
- p: p,
- done: make(chan struct{}),
- }
- p.addConnCalls[key] = call
- go call.run(t, key, c)
- }
- p.mu.Unlock()
- <-call.done
- if call.err != nil {
- return false, call.err
- }
- return !dup, nil
- }
- type addConnCall struct {
- _ incomparable
- p *clientConnPool
- done chan struct{} // closed when done
- err error
- }
- func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
- cc, err := t.NewClientConn(tc)
- p := c.p
- p.mu.Lock()
- if err != nil {
- c.err = err
- } else {
- cc.getConnCalled = true // already called by the net/http package
- p.addConnLocked(key, cc)
- }
- delete(p.addConnCalls, key)
- p.mu.Unlock()
- close(c.done)
- }
- // p.mu must be held
- func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
- for _, v := range p.conns[key] {
- if v == cc {
- return
- }
- }
- if p.conns == nil {
- p.conns = make(map[string][]*ClientConn)
- }
- if p.keys == nil {
- p.keys = make(map[*ClientConn][]string)
- }
- p.conns[key] = append(p.conns[key], cc)
- p.keys[cc] = append(p.keys[cc], key)
- }
- func (p *clientConnPool) MarkDead(cc *ClientConn) {
- p.mu.Lock()
- defer p.mu.Unlock()
- for _, key := range p.keys[cc] {
- vv, ok := p.conns[key]
- if !ok {
- continue
- }
- newList := filterOutClientConn(vv, cc)
- if len(newList) > 0 {
- p.conns[key] = newList
- } else {
- delete(p.conns, key)
- }
- }
- delete(p.keys, cc)
- }
- func (p *clientConnPool) closeIdleConnections() {
- p.mu.Lock()
- defer p.mu.Unlock()
- // TODO: don't close a cc if it was just added to the pool
- // milliseconds ago and has never been used. There's currently
- // a small race window with the HTTP/1 Transport's integration
- // where it can add an idle conn just before using it, and
- // somebody else can concurrently call CloseIdleConns and
- // break some caller's RoundTrip.
- for _, vv := range p.conns {
- for _, cc := range vv {
- cc.closeIfIdle()
- }
- }
- }
- func filterOutClientConn(in []*ClientConn, exclude *ClientConn) []*ClientConn {
- out := in[:0]
- for _, v := range in {
- if v != exclude {
- out = append(out, v)
- }
- }
- // If we filtered it out, zero out the last item to prevent
- // the GC from seeing it.
- if len(in) != len(out) {
- in[len(in)-1] = nil
- }
- return out
- }
- // noDialClientConnPool is an implementation of http2.ClientConnPool
- // which never dials. We let the HTTP/1.1 client dial and use its TLS
- // connection instead.
- type noDialClientConnPool struct{ *clientConnPool }
- func (p noDialClientConnPool) GetClientConn(req *http.Request, addr string) (*ClientConn, error) {
- return p.getClientConn(req, addr, noDialOnMiss)
- }
- // shouldRetryDial reports whether the current request should
- // retry dialing after the call finished unsuccessfully, for example
- // if the dial was canceled because of a context cancellation or
- // deadline expiry.
- func shouldRetryDial(call *dialCall, req *http.Request) bool {
- if call.err == nil {
- // No error, no need to retry
- return false
- }
- if call.ctx == req.Context() {
- // If the call has the same context as the request, the dial
- // should not be retried, since any cancellation will have come
- // from this request.
- return false
- }
- if !errors.Is(call.err, context.Canceled) && !errors.Is(call.err, context.DeadlineExceeded) {
- // If the call error is not because of a context cancellation or a deadline expiry,
- // the dial should not be retried.
- return false
- }
- // Only retry if the error is a context cancellation error or deadline expiry
- // and the context associated with the call was canceled or expired.
- return call.ctx.Err() != nil
- }
|