Browse Source

feat(go.d/sd/nl): make timeout and interval configurable (#18847)

Ilya Mashchenko 4 months ago
parent
commit
732b561145

+ 62 - 0
src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/ll.go

@@ -0,0 +1,62 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package netlisteners
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"time"
+
+	"github.com/netdata/netdata/go/plugins/pkg/executable"
+)
+
+type localListeners interface {
+	discover(ctx context.Context) ([]byte, error)
+}
+
+func newLocalListeners(timeout time.Duration) localListeners {
+	dir := os.Getenv("NETDATA_PLUGINS_DIR")
+	if dir == "" {
+		dir = executable.Directory
+	}
+	if dir == "" {
+		dir, _ = os.Getwd()
+	}
+
+	return &localListenersExec{
+		binPath: filepath.Join(dir, "local-listeners"),
+		timeout: timeout,
+	}
+}
+
+type localListenersExec struct {
+	binPath string
+	timeout time.Duration
+}
+
+func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) {
+	execCtx, cancel := context.WithTimeout(ctx, e.timeout)
+	defer cancel()
+
+	// TCPv4/6 and UPDv4 sockets in LISTEN state
+	// https://github.com/netdata/netdata/blob/master/src/collectors/utils/local_listeners.c
+	args := []string{
+		"no-udp6",
+		"no-local",
+		"no-inbound",
+		"no-outbound",
+		"no-namespaces",
+	}
+
+	cmd := exec.CommandContext(execCtx, e.binPath, args...)
+
+	bs, err := cmd.Output()
+	if err != nil {
+		return nil, fmt.Errorf("error on executing '%s': %v", cmd, err)
+	}
+
+	return bs, nil
+}

+ 25 - 51
src/go/plugin/go.d/agent/discovery/sd/discoverer/netlisteners/netlisteners.go

@@ -10,8 +10,6 @@ import (
 	"fmt"
 	"log/slog"
 	"net"
-	"os"
-	"os/exec"
 	"path/filepath"
 	"sort"
 	"strconv"
@@ -19,8 +17,8 @@ import (
 	"time"
 
 	"github.com/netdata/netdata/go/plugins/logger"
-	"github.com/netdata/netdata/go/plugins/pkg/executable"
 	"github.com/netdata/netdata/go/plugins/plugin/go.d/agent/discovery/sd/model"
+	"github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/confopt"
 
 	"github.com/ilyam8/hashstructure"
 )
@@ -30,18 +28,27 @@ var (
 	fullName  = fmt.Sprintf("sd:%s", shortName)
 )
 
+type Config struct {
+	Source string `yaml:"-"`
+	Tags   string `yaml:"tags"`
+
+	Interval *confopt.Duration `yaml:"interval"`
+	Timeout  confopt.Duration  `yaml:"timeout"`
+}
+
 func NewDiscoverer(cfg Config) (*Discoverer, error) {
 	tags, err := model.ParseTags(cfg.Tags)
 	if err != nil {
 		return nil, fmt.Errorf("parse tags: %v", err)
 	}
 
-	dir := os.Getenv("NETDATA_PLUGINS_DIR")
-	if dir == "" {
-		dir = executable.Directory
+	interval := time.Minute * 2
+	if cfg.Interval != nil {
+		interval = cfg.Interval.Duration()
 	}
-	if dir == "" {
-		dir, _ = os.Getwd()
+	timeout := time.Second * 5
+	if cfg.Timeout.Duration() != 0 {
+		timeout = cfg.Timeout.Duration()
 	}
 
 	d := &Discoverer{
@@ -49,12 +56,10 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) {
 			slog.String("component", "service discovery"),
 			slog.String("discoverer", shortName),
 		),
-		cfgSource: cfg.Source,
-		ll: &localListenersExec{
-			binPath: filepath.Join(dir, "local-listeners"),
-			timeout: time.Second * 5,
-		},
-		interval:   time.Minute * 2,
+		cfgSource:  cfg.Source,
+		ll:         newLocalListeners(timeout),
+		interval:   interval,
+		timeout:    timeout,
 		expiryTime: time.Minute * 10,
 		cache:      make(map[uint64]*cacheItem),
 		started:    make(chan struct{}),
@@ -65,11 +70,6 @@ func NewDiscoverer(cfg Config) (*Discoverer, error) {
 	return d, nil
 }
 
-type Config struct {
-	Source string `yaml:"-"`
-	Tags   string `yaml:"tags"`
-}
-
 type (
 	Discoverer struct {
 		*logger.Logger
@@ -78,6 +78,7 @@ type (
 		cfgSource string
 
 		interval time.Duration
+		timeout  time.Duration
 		ll       localListeners
 
 		expiryTime time.Duration
@@ -92,9 +93,6 @@ type (
 		lastSeenTime time.Time
 		tgt          model.Target
 	}
-	localListeners interface {
-		discover(ctx context.Context) ([]byte, error)
-	}
 )
 
 func (d *Discoverer) String() string {
@@ -103,6 +101,7 @@ func (d *Discoverer) String() string {
 
 func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup) {
 	d.Info("instance is started")
+	d.Debugf("used config: interval: %s, timeout: %s, cache expiration time: %s", d.interval, d.timeout, d.expiryTime)
 	defer func() { d.Info("instance is stopped") }()
 
 	close(d.started)
@@ -112,6 +111,10 @@ func (d *Discoverer) Discover(ctx context.Context, in chan<- []model.TargetGroup
 		return
 	}
 
+	if d.interval == 0 {
+		return
+	}
+
 	tk := time.NewTicker(d.interval)
 	defer tk.Stop()
 
@@ -295,35 +298,6 @@ func (d *Discoverer) parseLocalListeners(bs []byte) ([]model.Target, error) {
 	return tgts[:n], nil
 }
 
-type localListenersExec struct {
-	binPath string
-	timeout time.Duration
-}
-
-func (e *localListenersExec) discover(ctx context.Context) ([]byte, error) {
-	execCtx, cancel := context.WithTimeout(ctx, e.timeout)
-	defer cancel()
-
-	// TCPv4/6 and UPDv4 sockets in LISTEN state
-	// https://github.com/netdata/netdata/blob/master/src/collectors/utils/local_listeners.c
-	args := []string{
-		"no-udp6",
-		"no-local",
-		"no-inbound",
-		"no-outbound",
-		"no-namespaces",
-	}
-
-	cmd := exec.CommandContext(execCtx, e.binPath, args...)
-
-	bs, err := cmd.Output()
-	if err != nil {
-		return nil, fmt.Errorf("error on executing '%s': %v", cmd, err)
-	}
-
-	return bs, nil
-}
-
 func extractComm(cmdLine string) string {
 	if i := strings.IndexByte(cmdLine, ' '); i != -1 {
 		cmdLine = cmdLine[:i]