phpmainthread.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "fmt"
  6. "sync"
  7. "github.com/dunglas/frankenphp/internal/memory"
  8. "go.uber.org/zap"
  9. )
  10. // represents the main PHP thread
  11. // the thread needs to keep running as long as all other threads are running
  12. type phpMainThread struct {
  13. state *threadState
  14. done chan struct{}
  15. numThreads int
  16. maxThreads int
  17. phpIni map[string]string
  18. }
  19. var (
  20. phpThreads []*phpThread
  21. mainThread *phpMainThread
  22. )
  23. // start the main PHP thread
  24. // start a fixed number of inactive PHP threads
  25. // reserve a fixed number of possible PHP threads
  26. func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) (*phpMainThread, error) {
  27. mainThread = &phpMainThread{
  28. state: newThreadState(),
  29. done: make(chan struct{}),
  30. numThreads: numThreads,
  31. maxThreads: numMaxThreads,
  32. phpIni: phpIni,
  33. }
  34. // initialize the first thread
  35. // this needs to happen before starting the main thread
  36. // since some extensions access environment variables on startup
  37. // the threadIndex on the main thread defaults to 0 -> phpThreads[0].Pin(...)
  38. initialThread := newPHPThread(0)
  39. phpThreads = []*phpThread{initialThread}
  40. if err := mainThread.start(); err != nil {
  41. return nil, err
  42. }
  43. // initialize all other threads
  44. phpThreads = make([]*phpThread, mainThread.maxThreads)
  45. phpThreads[0] = initialThread
  46. for i := 1; i < mainThread.maxThreads; i++ {
  47. phpThreads[i] = newPHPThread(i)
  48. }
  49. // start the underlying C threads
  50. ready := sync.WaitGroup{}
  51. ready.Add(numThreads)
  52. for i := 0; i < numThreads; i++ {
  53. thread := phpThreads[i]
  54. go func() {
  55. thread.boot()
  56. ready.Done()
  57. }()
  58. }
  59. ready.Wait()
  60. return mainThread, nil
  61. }
  62. // ThreadDebugStatus prints the state of all PHP threads - debugging purposes only
  63. func ThreadDebugStatus() string {
  64. statusMessage := ""
  65. reservedThreadCount := 0
  66. for _, thread := range phpThreads {
  67. if thread.state.is(stateReserved) {
  68. reservedThreadCount++
  69. continue
  70. }
  71. statusMessage += thread.debugStatus() + "\n"
  72. }
  73. statusMessage += fmt.Sprintf("%d additional threads can be started at runtime\n", reservedThreadCount)
  74. return statusMessage
  75. }
  76. func drainPHPThreads() {
  77. doneWG := sync.WaitGroup{}
  78. doneWG.Add(len(phpThreads))
  79. mainThread.state.set(stateShuttingDown)
  80. close(mainThread.done)
  81. for _, thread := range phpThreads {
  82. // shut down all reserved threads
  83. if thread.state.compareAndSwap(stateReserved, stateDone) {
  84. doneWG.Done()
  85. continue
  86. }
  87. // shut down all active threads
  88. go func(thread *phpThread) {
  89. thread.shutdown()
  90. doneWG.Done()
  91. }(thread)
  92. }
  93. doneWG.Wait()
  94. mainThread.state.set(stateDone)
  95. mainThread.state.waitFor(stateReserved)
  96. phpThreads = nil
  97. }
  98. func (mainThread *phpMainThread) start() error {
  99. if C.frankenphp_new_main_thread(C.int(mainThread.numThreads)) != 0 {
  100. return MainThreadCreationError
  101. }
  102. mainThread.state.waitFor(stateReady)
  103. return nil
  104. }
  105. func getInactivePHPThread() *phpThread {
  106. thread := getPHPThreadAtState(stateInactive)
  107. if thread != nil {
  108. return thread
  109. }
  110. thread = getPHPThreadAtState(stateReserved)
  111. if thread == nil {
  112. return nil
  113. }
  114. thread.boot()
  115. return thread
  116. }
  117. func getPHPThreadAtState(state stateID) *phpThread {
  118. for _, thread := range phpThreads {
  119. if thread.state.is(state) {
  120. return thread
  121. }
  122. }
  123. return nil
  124. }
  125. //export go_frankenphp_main_thread_is_ready
  126. func go_frankenphp_main_thread_is_ready() {
  127. mainThread.setAutomaticMaxThreads()
  128. if mainThread.maxThreads < mainThread.numThreads {
  129. mainThread.maxThreads = mainThread.numThreads
  130. }
  131. mainThread.state.set(stateReady)
  132. mainThread.state.waitFor(stateDone)
  133. }
  134. // max_threads = auto
  135. // Estimate the amount of threads based on php.ini and system memory_limit
  136. // If unable to get the system's memory limit, simply double num_threads
  137. func (mainThread *phpMainThread) setAutomaticMaxThreads() {
  138. if mainThread.maxThreads >= 0 {
  139. return
  140. }
  141. perThreadMemoryLimit := int64(C.frankenphp_get_current_memory_limit())
  142. totalSysMemory := memory.TotalSysMemory()
  143. if perThreadMemoryLimit <= 0 || totalSysMemory == 0 {
  144. mainThread.maxThreads = mainThread.numThreads * 2
  145. return
  146. }
  147. maxAllowedThreads := totalSysMemory / uint64(perThreadMemoryLimit)
  148. mainThread.maxThreads = int(maxAllowedThreads)
  149. logger.Debug("Automatic thread limit", zap.Int("perThreadMemoryLimitMB", int(perThreadMemoryLimit/1024/1024)), zap.Int("maxThreads", mainThread.maxThreads))
  150. }
  151. //export go_frankenphp_shutdown_main_thread
  152. func go_frankenphp_shutdown_main_thread() {
  153. mainThread.state.set(stateReserved)
  154. }
  155. //export go_get_custom_php_ini
  156. func go_get_custom_php_ini() *C.char {
  157. if mainThread.phpIni == nil {
  158. return nil
  159. }
  160. // pass the php.ini overrides to PHP before startup
  161. // TODO: if needed this would also be possible on a per-thread basis
  162. overrides := ""
  163. for k, v := range mainThread.phpIni {
  164. overrides += fmt.Sprintf("%s=%s\n", k, v)
  165. }
  166. return C.CString(overrides)
  167. }