123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- // Package httpdown provides http.ConnState enabled graceful termination of
- // http.Server.
- // based on github.com/facebookarchive/httpdown, who's licence is MIT-licence,
- // we add a feature of supporting for http TLS
- package httpdown
- import (
- "crypto/tls"
- "fmt"
- "net"
- "net/http"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- "github.com/facebookgo/clock"
- "github.com/facebookgo/stats"
- )
- const (
- defaultStopTimeout = time.Minute
- defaultKillTimeout = time.Minute
- )
- // A Server allows encapsulates the process of accepting new connections and
- // serving them, and gracefully shutting down the listener without dropping
- // active connections.
- type Server interface {
- // Wait waits for the serving loop to finish. This will happen when Stop is
- // called, at which point it returns no error, or if there is an error in the
- // serving loop. You must call Wait after calling Serve or ListenAndServe.
- Wait() error
- // Stop stops the listener. It will block until all connections have been
- // closed.
- Stop() error
- }
- // HTTP defines the configuration for serving a http.Server. Multiple calls to
- // Serve or ListenAndServe can be made on the same HTTP instance. The default
- // timeouts of 1 minute each result in a maximum of 2 minutes before a Stop()
- // returns.
- type HTTP struct {
- // StopTimeout is the duration before we begin force closing connections.
- // Defaults to 1 minute.
- StopTimeout time.Duration
- // KillTimeout is the duration before which we completely give up and abort
- // even though we still have connected clients. This is useful when a large
- // number of client connections exist and closing them can take a long time.
- // Note, this is in addition to the StopTimeout. Defaults to 1 minute.
- KillTimeout time.Duration
- // Stats is optional. If provided, it will be used to record various metrics.
- Stats stats.Client
- // Clock allows for testing timing related functionality. Do not specify this
- // in production code.
- Clock clock.Clock
- // when set CertFile and KeyFile, the httpDown will start a http with TLS.
- // Files containing a certificate and matching private key for the
- // server must be provided if neither the Server's
- // TLSConfig.Certificates nor TLSConfig.GetCertificate are populated.
- // If the certificate is signed by a certificate authority, the
- // certFile should be the concatenation of the server's certificate,
- // any intermediates, and the CA's certificate.
- CertFile, KeyFile string
- }
- // Serve provides the low-level API which is useful if you're creating your own
- // net.Listener.
- func (h HTTP) Serve(s *http.Server, l net.Listener) Server {
- stopTimeout := h.StopTimeout
- if stopTimeout == 0 {
- stopTimeout = defaultStopTimeout
- }
- killTimeout := h.KillTimeout
- if killTimeout == 0 {
- killTimeout = defaultKillTimeout
- }
- klock := h.Clock
- if klock == nil {
- klock = clock.New()
- }
- ss := &server{
- stopTimeout: stopTimeout,
- killTimeout: killTimeout,
- stats: h.Stats,
- clock: klock,
- oldConnState: s.ConnState,
- listener: l,
- server: s,
- serveDone: make(chan struct{}),
- serveErr: make(chan error, 1),
- new: make(chan net.Conn),
- active: make(chan net.Conn),
- idle: make(chan net.Conn),
- closed: make(chan net.Conn),
- stop: make(chan chan struct{}),
- kill: make(chan chan struct{}),
- certFile: h.CertFile,
- keyFile: h.KeyFile,
- }
- s.ConnState = ss.connState
- go ss.manage()
- go ss.serve()
- return ss
- }
- // ListenAndServe returns a Server for the given http.Server. It is equivalent
- // to ListenAndServe from the standard library, but returns immediately.
- // Requests will be accepted in a background goroutine. If the http.Server has
- // a non-nil TLSConfig, a TLS enabled listener will be setup.
- func (h HTTP) ListenAndServe(s *http.Server) (Server, error) {
- addr := s.Addr
- if addr == "" {
- if s.TLSConfig == nil {
- addr = ":http"
- } else {
- addr = ":https"
- }
- }
- l, err := net.Listen("tcp", addr)
- if err != nil {
- stats.BumpSum(h.Stats, "listen.error", 1)
- return nil, err
- }
- if s.TLSConfig != nil {
- l = tls.NewListener(l, s.TLSConfig)
- }
- return h.Serve(s, l), nil
- }
- // server manages the serving process and allows for gracefully stopping it.
- type server struct {
- stopTimeout time.Duration
- killTimeout time.Duration
- stats stats.Client
- clock clock.Clock
- oldConnState func(net.Conn, http.ConnState)
- server *http.Server
- serveDone chan struct{}
- serveErr chan error
- listener net.Listener
- new chan net.Conn
- active chan net.Conn
- idle chan net.Conn
- closed chan net.Conn
- stop chan chan struct{}
- kill chan chan struct{}
- stopOnce sync.Once
- stopErr error
- certFile, keyFile string
- }
- func (s *server) connState(c net.Conn, cs http.ConnState) {
- if s.oldConnState != nil {
- s.oldConnState(c, cs)
- }
- switch cs {
- case http.StateNew:
- s.new <- c
- case http.StateActive:
- s.active <- c
- case http.StateIdle:
- s.idle <- c
- case http.StateHijacked, http.StateClosed:
- s.closed <- c
- }
- }
- func (s *server) manage() {
- defer func() {
- close(s.new)
- close(s.active)
- close(s.idle)
- close(s.closed)
- close(s.stop)
- close(s.kill)
- }()
- var stopDone chan struct{}
- conns := map[net.Conn]http.ConnState{}
- var countNew, countActive, countIdle float64
- // decConn decrements the count associated with the current state of the
- // given connection.
- decConn := func(c net.Conn) {
- switch conns[c] {
- default:
- panic(fmt.Errorf("unknown existing connection: %s", c))
- case http.StateNew:
- countNew--
- case http.StateActive:
- countActive--
- case http.StateIdle:
- countIdle--
- }
- }
- // setup a ticker to report various values every minute. if we don't have a
- // Stats implementation provided, we Stop it so it never ticks.
- statsTicker := s.clock.Ticker(time.Minute)
- if s.stats == nil {
- statsTicker.Stop()
- }
- for {
- select {
- case <-statsTicker.C:
- // we'll only get here when s.stats is not nil
- s.stats.BumpAvg("http-state.new", countNew)
- s.stats.BumpAvg("http-state.active", countActive)
- s.stats.BumpAvg("http-state.idle", countIdle)
- s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle)
- case c := <-s.new:
- conns[c] = http.StateNew
- countNew++
- case c := <-s.active:
- decConn(c)
- countActive++
- conns[c] = http.StateActive
- case c := <-s.idle:
- decConn(c)
- countIdle++
- conns[c] = http.StateIdle
- // if we're already stopping, close it
- if stopDone != nil {
- c.Close()
- }
- case c := <-s.closed:
- stats.BumpSum(s.stats, "conn.closed", 1)
- decConn(c)
- delete(conns, c)
- // if we're waiting to stop and are all empty, we just closed the last
- // connection and we're done.
- if stopDone != nil && len(conns) == 0 {
- close(stopDone)
- return
- }
- case stopDone = <-s.stop:
- // if we're already all empty, we're already done
- if len(conns) == 0 {
- close(stopDone)
- return
- }
- // close current idle connections right away
- for c, cs := range conns {
- if cs == http.StateIdle {
- c.Close()
- }
- }
- // continue the loop and wait for all the ConnState updates which will
- // eventually close(stopDone) and return from this goroutine.
- case killDone := <-s.kill:
- // force close all connections
- stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns)))
- for c := range conns {
- c.Close()
- }
- // don't block the kill.
- close(killDone)
- // continue the loop and we wait for all the ConnState updates and will
- // return from this goroutine when we're all done. otherwise we'll try to
- // send those ConnState updates on closed channels.
- }
- }
- }
- func (s *server) serve() {
- stats.BumpSum(s.stats, "serve", 1)
- if s.certFile == "" && s.keyFile == "" {
- s.serveErr <- s.server.Serve(s.listener)
- } else {
- s.serveErr <- s.server.ServeTLS(s.listener, s.certFile, s.keyFile)
- }
- close(s.serveDone)
- close(s.serveErr)
- }
- func (s *server) Wait() error {
- if err := <-s.serveErr; !isUseOfClosedError(err) {
- return err
- }
- return nil
- }
- func (s *server) Stop() error {
- s.stopOnce.Do(func() {
- defer stats.BumpTime(s.stats, "stop.time").End()
- stats.BumpSum(s.stats, "stop", 1)
- // first disable keep-alive for new connections
- s.server.SetKeepAlivesEnabled(false)
- // then close the listener so new connections can't connect come thru
- closeErr := s.listener.Close()
- <-s.serveDone
- // then trigger the background goroutine to stop and wait for it
- stopDone := make(chan struct{})
- s.stop <- stopDone
- // wait for stop
- select {
- case <-stopDone:
- case <-s.clock.After(s.stopTimeout):
- defer stats.BumpTime(s.stats, "kill.time").End()
- stats.BumpSum(s.stats, "kill", 1)
- // stop timed out, wait for kill
- killDone := make(chan struct{})
- s.kill <- killDone
- select {
- case <-killDone:
- case <-s.clock.After(s.killTimeout):
- // kill timed out, give up
- stats.BumpSum(s.stats, "kill.timeout", 1)
- }
- }
- if closeErr != nil && !isUseOfClosedError(closeErr) {
- stats.BumpSum(s.stats, "listener.close.error", 1)
- s.stopErr = closeErr
- }
- })
- return s.stopErr
- }
- func isUseOfClosedError(err error) bool {
- if err == nil {
- return false
- }
- if opErr, ok := err.(*net.OpError); ok {
- err = opErr.Err
- }
- return err.Error() == "use of closed network connection"
- }
- // ListenAndServe is a convenience function to serve and wait for a SIGTERM
- // or SIGINT before shutting down.
- func ListenAndServe(s *http.Server, hd *HTTP) error {
- if hd == nil {
- hd = &HTTP{}
- }
- hs, err := hd.ListenAndServe(s)
- if err != nil {
- return err
- }
- waiterr := make(chan error, 1)
- go func() {
- defer close(waiterr)
- waiterr <- hs.Wait()
- }()
- signals := make(chan os.Signal, 10)
- signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
- select {
- case err := <-waiterr:
- if err != nil {
- return err
- }
- case <-signals:
- signal.Stop(signals)
- if err := hs.Stop(); err != nil {
- return err
- }
- if err := <-waiterr; err != nil {
- return err
- }
- }
- return nil
- }
|