phpmainthread.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package frankenphp
  2. // #include "frankenphp.h"
  3. import "C"
  4. import (
  5. "sync"
  6. "go.uber.org/zap"
  7. )
  8. // represents the main PHP thread
  9. // the thread needs to keep running as long as all other threads are running
  10. type phpMainThread struct {
  11. state *threadState
  12. done chan struct{}
  13. numThreads int
  14. }
  15. var (
  16. phpThreads []*phpThread
  17. mainThread *phpMainThread
  18. )
  19. // reserve a fixed number of PHP threads on the Go side
  20. func initPHPThreads(numThreads int) error {
  21. mainThread = &phpMainThread{
  22. state: newThreadState(),
  23. done: make(chan struct{}),
  24. numThreads: numThreads,
  25. }
  26. phpThreads = make([]*phpThread, numThreads)
  27. // initialize all threads as inactive
  28. // this needs to happen before starting the main thread
  29. // since some extensions access environment variables on startup
  30. for i := 0; i < numThreads; i++ {
  31. phpThreads[i] = newPHPThread(i)
  32. convertToInactiveThread(phpThreads[i])
  33. }
  34. if err := mainThread.start(); err != nil {
  35. return err
  36. }
  37. // start the underlying C threads
  38. ready := sync.WaitGroup{}
  39. ready.Add(numThreads)
  40. for _, thread := range phpThreads {
  41. go func() {
  42. if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) {
  43. logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex))
  44. }
  45. thread.state.waitFor(stateInactive)
  46. ready.Done()
  47. }()
  48. }
  49. ready.Wait()
  50. return nil
  51. }
  52. func drainPHPThreads() {
  53. doneWG := sync.WaitGroup{}
  54. doneWG.Add(len(phpThreads))
  55. for _, thread := range phpThreads {
  56. thread.handlerMu.Lock()
  57. _ = thread.state.requestSafeStateChange(stateShuttingDown)
  58. close(thread.drainChan)
  59. }
  60. close(mainThread.done)
  61. for _, thread := range phpThreads {
  62. go func(thread *phpThread) {
  63. thread.state.waitFor(stateDone)
  64. thread.handlerMu.Unlock()
  65. doneWG.Done()
  66. }(thread)
  67. }
  68. doneWG.Wait()
  69. mainThread.state.set(stateShuttingDown)
  70. mainThread.state.waitFor(stateDone)
  71. phpThreads = nil
  72. }
  73. func (mainThread *phpMainThread) start() error {
  74. if C.frankenphp_new_main_thread(C.int(mainThread.numThreads)) != 0 {
  75. return MainThreadCreationError
  76. }
  77. mainThread.state.waitFor(stateReady)
  78. return nil
  79. }
  80. func getInactivePHPThread() *phpThread {
  81. for _, thread := range phpThreads {
  82. if thread.state.is(stateInactive) {
  83. return thread
  84. }
  85. }
  86. panic("not enough threads reserved")
  87. }
  88. //export go_frankenphp_main_thread_is_ready
  89. func go_frankenphp_main_thread_is_ready() {
  90. mainThread.state.set(stateReady)
  91. mainThread.state.waitFor(stateShuttingDown)
  92. }
  93. //export go_frankenphp_shutdown_main_thread
  94. func go_frankenphp_shutdown_main_thread() {
  95. mainThread.state.set(stateDone)
  96. }