Browse Source

Adds autoscale tests.

Alliballibaba 2 months ago
parent
commit
031424784a
6 changed files with 216 additions and 73 deletions
  1. 110 0
      caddy/admin_test.go
  2. 0 23
      frankenphp.c
  3. 1 0
      frankenphp.go
  4. 0 2
      frankenphp.h
  5. 82 48
      scaling.go
  6. 23 0
      testdata/sleep.php

+ 110 - 0
caddy/admin_test.go

@@ -5,6 +5,8 @@ import (
 	"io"
 	"net/http"
 	"path/filepath"
+	"strings"
+	"sync"
 	"testing"
 
 	"github.com/caddyserver/caddy/v2/caddytest"
@@ -176,6 +178,114 @@ func TestShowTheCorrectThreadDebugStatus(t *testing.T) {
 	assert.Contains(t, threadInfo, "7 additional threads can be started at runtime")
 }
 
+func TestAutoScaleWorkerThreads(t *testing.T) {
+	wg := sync.WaitGroup{}
+	maxTries := 100
+	requestsPerTry := 200
+	tester := caddytest.NewTester(t)
+	tester.InitServer(`
+		{
+			skip_install_trust
+			admin localhost:2999
+			http_port `+testPort+`
+
+			frankenphp {
+				max_threads 10
+				num_threads 2
+				worker ../testdata/sleep.php 1
+			}
+		}
+
+		localhost:`+testPort+` {
+			route {
+				root ../testdata
+				rewrite sleep.php
+				php
+			}
+		}
+		`, "caddyfile")
+
+	// spam an endpoint that simulates IO
+	endpoint := "http://localhost:" + testPort + "/?sleep=5&work=1000"
+	autoScaledThread := "Thread 2"
+
+	// first assert that the thread is not already present
+	threadInfo := getAdminResponseBody(tester, "GET", "threads")
+	assert.NotContains(t, threadInfo, autoScaledThread)
+
+	// try to spawn the additional threads by spamming the server
+	for tries := 0; tries < maxTries; tries++ {
+		wg.Add(requestsPerTry)
+		for i := 0; i < requestsPerTry; i++ {
+			go func() {
+				tester.AssertGetResponse(endpoint, http.StatusOK, "slept for 5 ms and worked for 1000 iterations")
+				wg.Done()
+			}()
+		}
+		wg.Wait()
+		threadInfo = getAdminResponseBody(tester, "GET", "threads")
+		if strings.Contains(threadInfo, autoScaledThread) {
+			break
+		}
+	}
+
+	// assert that the autoscaled thread is present in the threadInfo
+	assert.Contains(t, threadInfo, autoScaledThread)
+}
+
+func TestAutoScaleRegularThreads(t *testing.T) {
+	wg := sync.WaitGroup{}
+	maxTries := 100
+	requestsPerTry := 200
+	tester := caddytest.NewTester(t)
+	tester.InitServer(`
+		{
+			skip_install_trust
+			admin localhost:2999
+			http_port `+testPort+`
+
+			frankenphp {
+				max_threads 10
+				num_threads 1
+			}
+		}
+
+		localhost:`+testPort+` {
+			route {
+				root ../testdata
+				php
+			}
+		}
+		`, "caddyfile")
+
+	// spam an endpoint that simulates IO
+	endpoint := "http://localhost:" + testPort + "/sleep.php?sleep=5&work=1000"
+	autoScaledThread := "Thread 1"
+
+	// first assert that the thread is not already present
+	threadInfo := getAdminResponseBody(tester, "GET", "threads")
+	assert.NotContains(t, threadInfo, autoScaledThread)
+
+	// try to spawn the additional threads by spamming the server
+	for tries := 0; tries < maxTries; tries++ {
+		wg.Add(requestsPerTry)
+		for i := 0; i < requestsPerTry; i++ {
+			go func() {
+				tester.AssertGetResponse(endpoint, http.StatusOK, "slept for 5 ms and worked for 1000 iterations")
+				wg.Done()
+			}()
+		}
+		wg.Wait()
+		threadInfo = getAdminResponseBody(tester, "GET", "threads")
+		if strings.Contains(threadInfo, autoScaledThread) {
+			break
+		}
+	}
+
+	// assert that the autoscaled thread is present in the threadInfo
+	assert.Contains(t, threadInfo, autoScaledThread)
+}
+
 func assertAdminResponse(tester *caddytest.Tester, method string, path string, expectedStatus int, expectedBody string) {
 	adminUrl := "http://localhost:2999/frankenphp/"
 	r, err := http.NewRequest(method, adminUrl+path, nil)

+ 0 - 23
frankenphp.c

@@ -1166,26 +1166,3 @@ int frankenphp_reset_opcache(void) {
   }
   return 0;
 }
-
-/*
- * Probe the CPU usage of the entire process fo x milliseconds
- * Uses clock_gettime to compare cpu time with real time
- * Returns the % of CPUs used by the process in the timeframe
- */
-float frankenphp_probe_cpu(int cpu_count, int milliseconds) {
-  struct timespec sleep_time, cpu_start, cpu_end, probe_start, probe_end;
-  clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &cpu_start);
-  clock_gettime(CLOCK_MONOTONIC, &probe_start);
-
-  sleep_time.tv_sec = 0;
-  sleep_time.tv_nsec = 1000 * 1000 * milliseconds;
-  nanosleep(&sleep_time, &sleep_time);
-
-  clock_gettime(CLOCK_PROCESS_CPUTIME_ID, &cpu_end);
-  clock_gettime(CLOCK_MONOTONIC, &probe_end);
-  float cpu_diff = (cpu_end.tv_nsec / 1000000000.0 + cpu_end.tv_sec) -
-                   (cpu_start.tv_nsec / 1000000000.0 + cpu_start.tv_sec);
-  float req_diff = (probe_end.tv_nsec / 1000000000.0 + probe_end.tv_sec) -
-                   (probe_start.tv_nsec / 1000000000.0 + probe_start.tv_sec);
-  return cpu_diff / req_diff / cpu_count;
-}

+ 1 - 0
frankenphp.go

@@ -367,6 +367,7 @@ func Init(options ...Option) error {
 func Shutdown() {
 	drainWorkers()
 	drainPHPThreads()
+	drainAutoScaling()
 	metrics.Shutdown()
 
 	// Remove the installed app

+ 0 - 2
frankenphp.h

@@ -69,6 +69,4 @@ zend_string *frankenphp_init_persistent_string(const char *string, size_t len);
 void frankenphp_release_zend_string(zend_string *z_string);
 int frankenphp_reset_opcache(void);
 
-float frankenphp_probe_cpu(int cpu_count, int milliseconds);
-
 #endif

+ 82 - 48
scaling.go

@@ -4,7 +4,6 @@ package frankenphp
 import "C"
 import (
 	"errors"
-	"fmt"
 	"runtime"
 	"sync"
 	"sync/atomic"
@@ -18,7 +17,7 @@ const (
 	// only allow scaling threads if requests were stalled for longer than this time
 	allowedStallTime = 10 * time.Millisecond
 	// the amount of time to check for CPU usage before scaling
-	cpuProbeTime = 100 * time.Millisecond
+	cpuProbeTime = 50 * time.Millisecond
 	// if PHP threads are using more than this ratio of the CPU, do not scale
 	maxCpuUsageForScaling = 0.8
 	// check if threads should be stopped every x seconds
@@ -40,66 +39,84 @@ var (
 func AddRegularThread() (int, error) {
 	scalingMu.Lock()
 	defer scalingMu.Unlock()
+	_, err := addRegularThread()
+	return countRegularThreads(), err
+}
+
+func addRegularThread() (*phpThread, error) {
 	thread := getInactivePHPThread()
 	if thread == nil {
-		return countRegularThreads(), fmt.Errorf("max amount of overall threads reached: %d", len(phpThreads))
+		return nil, errors.New("max amount of overall threads reached")
 	}
 	convertToRegularThread(thread)
-	return countRegularThreads(), nil
+	return thread, nil
 }
 
-// remove the last regular thread
 func RemoveRegularThread() (int, error) {
 	scalingMu.Lock()
 	defer scalingMu.Unlock()
+	err := removeRegularThread()
+	return countRegularThreads(), err
+}
+
+// remove the last regular thread
+func removeRegularThread() error {
 	regularThreadMu.RLock()
 	if len(regularThreads) <= 1 {
 		regularThreadMu.RUnlock()
-		return 1, errors.New("cannot remove last thread")
+		return errors.New("cannot remove last thread")
 	}
 	thread := regularThreads[len(regularThreads)-1]
 	regularThreadMu.RUnlock()
 	thread.shutdown()
-	return countRegularThreads(), nil
+	return nil
 }
 
-// turn the first inactive/reserved thread into a worker thread
 func AddWorkerThread(workerFileName string) (int, error) {
-	scalingMu.Lock()
-	defer scalingMu.Unlock()
 	worker, ok := workers[workerFileName]
 	if !ok {
 		return 0, errors.New("worker not found")
 	}
+	scalingMu.Lock()
+	defer scalingMu.Unlock()
+	_, err := addWorkerThread(worker)
+	return worker.countThreads(), err
+}
 
+// turn the first inactive/reserved thread into a worker thread
+func addWorkerThread(worker *worker) (*phpThread, error) {
 	thread := getInactivePHPThread()
 	if thread == nil {
-		count := worker.countThreads()
-		return count, fmt.Errorf("max amount of threads reached: %d", count)
+		return nil, errors.New("max amount of overall threads reached")
 	}
 	convertToWorkerThread(thread, worker)
-	return worker.countThreads(), nil
+	return thread, nil
 }
 
-// remove the last worker thread
 func RemoveWorkerThread(workerFileName string) (int, error) {
-	scalingMu.Lock()
-	defer scalingMu.Unlock()
 	worker, ok := workers[workerFileName]
 	if !ok {
 		return 0, errors.New("worker not found")
 	}
+	scalingMu.Lock()
+	defer scalingMu.Unlock()
+	err := removeWorkerThread(worker)
+
+	return worker.countThreads(), err
+}
 
+// remove the last worker thread
+func removeWorkerThread(worker *worker) error {
 	worker.threadMutex.RLock()
 	if len(worker.threads) <= 1 {
 		worker.threadMutex.RUnlock()
-		return 1, errors.New("cannot remove last thread")
+		return errors.New("cannot remove last thread")
 	}
 	thread := worker.threads[len(worker.threads)-1]
 	worker.threadMutex.RUnlock()
 	thread.shutdown()
 
-	return worker.countThreads(), nil
+	return nil
 }
 
 func initAutoScaling(numThreads int, maxThreads int) {
@@ -107,8 +124,8 @@ func initAutoScaling(numThreads int, maxThreads int) {
 		blockAutoScaling.Store(true)
 		return
 	}
-	autoScaledThreads = make([]*phpThread, 0, maxThreads-numThreads)
 	blockAutoScaling.Store(false)
+	autoScaledThreads = make([]*phpThread, 0, maxThreads-numThreads)
 	timer := time.NewTimer(downScaleCheckTime)
 	doneChan := mainThread.done
 	go func() {
@@ -124,52 +141,60 @@ func initAutoScaling(numThreads int, maxThreads int) {
 	}()
 }
 
+func drainAutoScaling() {
+	scalingMu.Lock()
+	blockAutoScaling.Store(true)
+	scalingMu.Unlock()
+}
+
 // Add worker PHP threads automatically
 func autoscaleWorkerThreads(worker *worker, timeSpentStalling time.Duration) {
-
 	// first check if time spent waiting for a thread was above the allowed threshold
 	if timeSpentStalling < allowedStallTime || !blockAutoScaling.CompareAndSwap(false, true) {
 		return
 	}
+	scalingMu.Lock()
+	defer scalingMu.Unlock()
 	defer blockAutoScaling.Store(false)
 
 	// TODO: is there an easy way to check if we are reaching memory limits?
 
-	if probeIfCpusAreBusy(cpuProbeTime) {
+	if !probeCPUs(cpuProbeTime) {
 		logger.Debug("cpu is busy, not autoscaling", zap.String("worker", worker.fileName))
 		return
 	}
 
-	count, err := AddWorkerThread(worker.fileName)
+	thread, err := addWorkerThread(worker)
 	if err != nil {
-		logger.Debug("could not add worker thread", zap.String("worker", worker.fileName), zap.Int("count", count), zap.Error(err))
+		logger.Debug("could not add worker thread", zap.String("worker", worker.fileName), zap.Error(err))
+		return
 	}
 
-	scalingMu.Lock()
-	autoScaledThreads = append(autoScaledThreads, worker.threads[len(worker.threads)-1])
-	scalingMu.Unlock()
+	autoScaledThreads = append(autoScaledThreads, thread)
 }
 
 // Add regular PHP threads automatically
 func autoscaleRegularThreads(timeSpentStalling time.Duration) {
-
 	// first check if time spent waiting for a thread was above the allowed threshold
 	if timeSpentStalling < allowedStallTime || !blockAutoScaling.CompareAndSwap(false, true) {
 		return
 	}
+	scalingMu.Lock()
+	defer scalingMu.Unlock()
 	defer blockAutoScaling.Store(false)
 
-	if probeIfCpusAreBusy(cpuProbeTime) {
+	if !probeCPUs(cpuProbeTime) {
 		logger.Debug("cpu is busy, not autoscaling")
 		return
 	}
 
-	count, err := AddRegularThread()
-	scalingMu.Lock()
-	autoScaledThreads = append(autoScaledThreads, regularThreads[len(regularThreads)-1])
-	scalingMu.Unlock()
+	thread, err := addRegularThread()
+	if err != nil {
+		logger.Debug("could not add regular thread", zap.Error(err))
+		return
+	}
 
-	logger.Debug("regular thread autoscaling", zap.Int("count", count), zap.Error(err))
+	autoScaledThreads = append(autoScaledThreads, thread)
 }
 
 func downScaleThreads() {
@@ -210,23 +235,32 @@ func downScaleThreads() {
 	}
 }
 
-func readMemory() {
-	return
-	var mem runtime.MemStats
-	runtime.ReadMemStats(&mem)
-
-	fmt.Printf("Total allocated memory: %d bytes\n", mem.TotalAlloc)
-	fmt.Printf("Number of memory allocations: %d\n", mem.Mallocs)
-}
-
-// probe the CPU usage of the process
+// probe the CPU usage of all PHP Threads
 // if CPUs are not busy, most threads are likely waiting for I/O, so we should scale
 // if CPUs are already busy we won't gain much by scaling and want to avoid the overhead of doing so
-// keep in mind that this will only probe CPU usage by PHP Threads
 // time spent by the go runtime or other processes is not considered
-func probeIfCpusAreBusy(sleepTime time.Duration) bool {
-	cpuUsage := float64(C.frankenphp_probe_cpu(C.int(cpuCount), C.int(sleepTime.Milliseconds())))
+func probeCPUs(probeTime time.Duration) bool {
+	var startTime, endTime, cpuTime, cpuEndTime C.struct_timespec
+
+	C.clock_gettime(C.CLOCK_MONOTONIC, &startTime)
+	C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuTime)
+
+	timer := time.NewTimer(probeTime)
+	select {
+	case <-mainThread.done:
+		return false
+	case <-timer.C:
+	}
+
+	C.clock_gettime(C.CLOCK_MONOTONIC, &endTime)
+	C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuEndTime)
+
+	elapsedTime := float64((endTime.tv_sec-startTime.tv_sec)*1e9 + (endTime.tv_nsec - startTime.tv_nsec))
+	elapsedCpuTime := float64((cpuEndTime.tv_sec-cpuTime.tv_sec)*1e9 + (cpuEndTime.tv_nsec - cpuTime.tv_nsec))
+	cpuUsage := elapsedCpuTime / elapsedTime / float64(cpuCount)
+
+	// TODO: remove unnecessary debug messages
+	logger.Debug("CPU usage", zap.Float64("cpuUsage", cpuUsage))
 
-	logger.Warn("CPU usage", zap.Float64("usage", cpuUsage))
-	return cpuUsage > maxCpuUsageForScaling
+	return cpuUsage < maxCpuUsageForScaling
 }

+ 23 - 0
testdata/sleep.php

@@ -0,0 +1,23 @@
+<?php
+
+require_once __DIR__ . '/_executor.php';
+
+return function () {
+    $sleep = $_GET['sleep'] ?? 0;
+    $work = $_GET['work'] ?? 0;
+
+    // simulate work
+    // 50_000 iterations are approximately the weight of a simple Laravel request
+    for ($i = 0; $i < $work; $i++) {
+        $a = +$i;
+    }
+
+    // simulate IO, some examples:
+    // SSDs: 0.1ms - 1ms
+    // HDDs: 5ms - 10ms
+    // modern databases: usually 1ms - 10ms (for simple queries)
+    // external APIs: can take up to multiple seconds
+    usleep($sleep * 1000);
+
+    echo "slept for $sleep ms and worked for $work iterations";
+};