123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package net2
- import (
- "net"
- "strings"
- "time"
- rp "github.com/seaweedfs/seaweedfs/weed/wdclient/resource_pool"
- )
- const defaultDialTimeout = 1 * time.Second
- func defaultDialFunc(network string, address string) (net.Conn, error) {
- return net.DialTimeout(network, address, defaultDialTimeout)
- }
- func parseResourceLocation(resourceLocation string) (
- network string,
- address string) {
- idx := strings.Index(resourceLocation, " ")
- if idx >= 0 {
- return resourceLocation[:idx], resourceLocation[idx+1:]
- }
- return "", resourceLocation
- }
- // A thin wrapper around the underlying resource pool.
- type connectionPoolImpl struct {
- options ConnectionOptions
- pool rp.ResourcePool
- }
- // This returns a connection pool where all connections are connected
- // to the same (network, address)
- func newBaseConnectionPool(
- options ConnectionOptions,
- createPool func(rp.Options) rp.ResourcePool) ConnectionPool {
- dial := options.Dial
- if dial == nil {
- dial = defaultDialFunc
- }
- openFunc := func(loc string) (interface{}, error) {
- network, address := parseResourceLocation(loc)
- return dial(network, address)
- }
- closeFunc := func(handle interface{}) error {
- return handle.(net.Conn).Close()
- }
- poolOptions := rp.Options{
- MaxActiveHandles: options.MaxActiveConnections,
- MaxIdleHandles: options.MaxIdleConnections,
- MaxIdleTime: options.MaxIdleTime,
- OpenMaxConcurrency: options.DialMaxConcurrency,
- Open: openFunc,
- Close: closeFunc,
- NowFunc: options.NowFunc,
- }
- return &connectionPoolImpl{
- options: options,
- pool: createPool(poolOptions),
- }
- }
- // This returns a connection pool where all connections are connected
- // to the same (network, address)
- func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool {
- return newBaseConnectionPool(options, rp.NewSimpleResourcePool)
- }
- // This returns a connection pool that manages multiple (network, address)
- // entries. The connections to each (network, address) entry acts
- // independently. For example ("tcp", "localhost:11211") could act as memcache
- // shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1.
- func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool {
- return newBaseConnectionPool(
- options,
- func(poolOptions rp.Options) rp.ResourcePool {
- return rp.NewMultiResourcePool(poolOptions, nil)
- })
- }
- // See ConnectionPool for documentation.
- func (p *connectionPoolImpl) NumActive() int32 {
- return p.pool.NumActive()
- }
- // See ConnectionPool for documentation.
- func (p *connectionPoolImpl) ActiveHighWaterMark() int32 {
- return p.pool.ActiveHighWaterMark()
- }
- // This returns the number of alive idle connections. This method is not part
- // of ConnectionPool's API. It is used only for testing.
- func (p *connectionPoolImpl) NumIdle() int {
- return p.pool.NumIdle()
- }
- // BaseConnectionPool can only register a single (network, address) entry.
- // Register should be call before any Get calls.
- func (p *connectionPoolImpl) Register(network string, address string) error {
- return p.pool.Register(network + " " + address)
- }
- // BaseConnectionPool has nothing to do on Unregister.
- func (p *connectionPoolImpl) Unregister(network string, address string) error {
- return nil
- }
- func (p *connectionPoolImpl) ListRegistered() []NetworkAddress {
- result := make([]NetworkAddress, 0, 1)
- for _, location := range p.pool.ListRegistered() {
- network, address := parseResourceLocation(location)
- result = append(
- result,
- NetworkAddress{
- Network: network,
- Address: address,
- })
- }
- return result
- }
- // This gets an active connection from the connection pool. Note that network
- // and address arguments are ignored (The connections with point to the
- // network/address provided by the first Register call).
- func (p *connectionPoolImpl) Get(
- network string,
- address string) (ManagedConn, error) {
- handle, err := p.pool.Get(network + " " + address)
- if err != nil {
- return nil, err
- }
- return NewManagedConn(network, address, handle, p, p.options), nil
- }
- // See ConnectionPool for documentation.
- func (p *connectionPoolImpl) Release(conn ManagedConn) error {
- return conn.ReleaseConnection()
- }
- // See ConnectionPool for documentation.
- func (p *connectionPoolImpl) Discard(conn ManagedConn) error {
- return conn.DiscardConnection()
- }
- // See ConnectionPool for documentation.
- func (p *connectionPoolImpl) EnterLameDuckMode() {
- p.pool.EnterLameDuckMode()
- }
|