idle.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. /*
  2. *
  3. * Copyright 2023 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "math"
  22. "sync"
  23. "sync/atomic"
  24. "time"
  25. )
  26. // For overriding in unit tests.
  27. var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
  28. return time.AfterFunc(d, f)
  29. }
  30. // idlenessEnforcer is the functionality provided by grpc.ClientConn to enter
  31. // and exit from idle mode.
  32. type idlenessEnforcer interface {
  33. exitIdleMode() error
  34. enterIdleMode() error
  35. }
  36. // idlenessManager defines the functionality required to track RPC activity on a
  37. // channel.
  38. type idlenessManager interface {
  39. onCallBegin() error
  40. onCallEnd()
  41. close()
  42. }
  43. type noopIdlenessManager struct{}
  44. func (noopIdlenessManager) onCallBegin() error { return nil }
  45. func (noopIdlenessManager) onCallEnd() {}
  46. func (noopIdlenessManager) close() {}
  47. // idlenessManagerImpl implements the idlenessManager interface. It uses atomic
  48. // operations to synchronize access to shared state and a mutex to guarantee
  49. // mutual exclusion in a critical section.
  50. type idlenessManagerImpl struct {
  51. // State accessed atomically.
  52. lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
  53. activeCallsCount int32 // Count of active RPCs; -math.MaxInt32 means channel is idle or is trying to get there.
  54. activeSinceLastTimerCheck int32 // Boolean; True if there was an RPC since the last timer callback.
  55. closed int32 // Boolean; True when the manager is closed.
  56. // Can be accessed without atomics or mutex since these are set at creation
  57. // time and read-only after that.
  58. enforcer idlenessEnforcer // Functionality provided by grpc.ClientConn.
  59. timeout int64 // Idle timeout duration nanos stored as an int64.
  60. // idleMu is used to guarantee mutual exclusion in two scenarios:
  61. // - Opposing intentions:
  62. // - a: Idle timeout has fired and handleIdleTimeout() is trying to put
  63. // the channel in idle mode because the channel has been inactive.
  64. // - b: At the same time an RPC is made on the channel, and onCallBegin()
  65. // is trying to prevent the channel from going idle.
  66. // - Competing intentions:
  67. // - The channel is in idle mode and there are multiple RPCs starting at
  68. // the same time, all trying to move the channel out of idle. Only one
  69. // of them should succeed in doing so, while the other RPCs should
  70. // piggyback on the first one and be successfully handled.
  71. idleMu sync.RWMutex
  72. actuallyIdle bool
  73. timer *time.Timer
  74. }
  75. // newIdlenessManager creates a new idleness manager implementation for the
  76. // given idle timeout.
  77. func newIdlenessManager(enforcer idlenessEnforcer, idleTimeout time.Duration) idlenessManager {
  78. if idleTimeout == 0 {
  79. return noopIdlenessManager{}
  80. }
  81. i := &idlenessManagerImpl{
  82. enforcer: enforcer,
  83. timeout: int64(idleTimeout),
  84. }
  85. i.timer = timeAfterFunc(idleTimeout, i.handleIdleTimeout)
  86. return i
  87. }
  88. // resetIdleTimer resets the idle timer to the given duration. This method
  89. // should only be called from the timer callback.
  90. func (i *idlenessManagerImpl) resetIdleTimer(d time.Duration) {
  91. i.idleMu.Lock()
  92. defer i.idleMu.Unlock()
  93. if i.timer == nil {
  94. // Only close sets timer to nil. We are done.
  95. return
  96. }
  97. // It is safe to ignore the return value from Reset() because this method is
  98. // only ever called from the timer callback, which means the timer has
  99. // already fired.
  100. i.timer.Reset(d)
  101. }
  102. // handleIdleTimeout is the timer callback that is invoked upon expiry of the
  103. // configured idle timeout. The channel is considered inactive if there are no
  104. // ongoing calls and no RPC activity since the last time the timer fired.
  105. func (i *idlenessManagerImpl) handleIdleTimeout() {
  106. if i.isClosed() {
  107. return
  108. }
  109. if atomic.LoadInt32(&i.activeCallsCount) > 0 {
  110. i.resetIdleTimer(time.Duration(i.timeout))
  111. return
  112. }
  113. // There has been activity on the channel since we last got here. Reset the
  114. // timer and return.
  115. if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
  116. // Set the timer to fire after a duration of idle timeout, calculated
  117. // from the time the most recent RPC completed.
  118. atomic.StoreInt32(&i.activeSinceLastTimerCheck, 0)
  119. i.resetIdleTimer(time.Duration(atomic.LoadInt64(&i.lastCallEndTime) + i.timeout - time.Now().UnixNano()))
  120. return
  121. }
  122. // This CAS operation is extremely likely to succeed given that there has
  123. // been no activity since the last time we were here. Setting the
  124. // activeCallsCount to -math.MaxInt32 indicates to onCallBegin() that the
  125. // channel is either in idle mode or is trying to get there.
  126. if !atomic.CompareAndSwapInt32(&i.activeCallsCount, 0, -math.MaxInt32) {
  127. // This CAS operation can fail if an RPC started after we checked for
  128. // activity at the top of this method, or one was ongoing from before
  129. // the last time we were here. In both case, reset the timer and return.
  130. i.resetIdleTimer(time.Duration(i.timeout))
  131. return
  132. }
  133. // Now that we've set the active calls count to -math.MaxInt32, it's time to
  134. // actually move to idle mode.
  135. if i.tryEnterIdleMode() {
  136. // Successfully entered idle mode. No timer needed until we exit idle.
  137. return
  138. }
  139. // Failed to enter idle mode due to a concurrent RPC that kept the channel
  140. // active, or because of an error from the channel. Undo the attempt to
  141. // enter idle, and reset the timer to try again later.
  142. atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
  143. i.resetIdleTimer(time.Duration(i.timeout))
  144. }
  145. // tryEnterIdleMode instructs the channel to enter idle mode. But before
  146. // that, it performs a last minute check to ensure that no new RPC has come in,
  147. // making the channel active.
  148. //
  149. // Return value indicates whether or not the channel moved to idle mode.
  150. //
  151. // Holds idleMu which ensures mutual exclusion with exitIdleMode.
  152. func (i *idlenessManagerImpl) tryEnterIdleMode() bool {
  153. i.idleMu.Lock()
  154. defer i.idleMu.Unlock()
  155. if atomic.LoadInt32(&i.activeCallsCount) != -math.MaxInt32 {
  156. // We raced and lost to a new RPC. Very rare, but stop entering idle.
  157. return false
  158. }
  159. if atomic.LoadInt32(&i.activeSinceLastTimerCheck) == 1 {
  160. // An very short RPC could have come in (and also finished) after we
  161. // checked for calls count and activity in handleIdleTimeout(), but
  162. // before the CAS operation. So, we need to check for activity again.
  163. return false
  164. }
  165. // No new RPCs have come in since we last set the active calls count value
  166. // -math.MaxInt32 in the timer callback. And since we have the lock, it is
  167. // safe to enter idle mode now.
  168. if err := i.enforcer.enterIdleMode(); err != nil {
  169. logger.Errorf("Failed to enter idle mode: %v", err)
  170. return false
  171. }
  172. // Successfully entered idle mode.
  173. i.actuallyIdle = true
  174. return true
  175. }
  176. // onCallBegin is invoked at the start of every RPC.
  177. func (i *idlenessManagerImpl) onCallBegin() error {
  178. if i.isClosed() {
  179. return nil
  180. }
  181. if atomic.AddInt32(&i.activeCallsCount, 1) > 0 {
  182. // Channel is not idle now. Set the activity bit and allow the call.
  183. atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
  184. return nil
  185. }
  186. // Channel is either in idle mode or is in the process of moving to idle
  187. // mode. Attempt to exit idle mode to allow this RPC.
  188. if err := i.exitIdleMode(); err != nil {
  189. // Undo the increment to calls count, and return an error causing the
  190. // RPC to fail.
  191. atomic.AddInt32(&i.activeCallsCount, -1)
  192. return err
  193. }
  194. atomic.StoreInt32(&i.activeSinceLastTimerCheck, 1)
  195. return nil
  196. }
  197. // exitIdleMode instructs the channel to exit idle mode.
  198. //
  199. // Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
  200. func (i *idlenessManagerImpl) exitIdleMode() error {
  201. i.idleMu.Lock()
  202. defer i.idleMu.Unlock()
  203. if !i.actuallyIdle {
  204. // This can happen in two scenarios:
  205. // - handleIdleTimeout() set the calls count to -math.MaxInt32 and called
  206. // tryEnterIdleMode(). But before the latter could grab the lock, an RPC
  207. // came in and onCallBegin() noticed that the calls count is negative.
  208. // - Channel is in idle mode, and multiple new RPCs come in at the same
  209. // time, all of them notice a negative calls count in onCallBegin and get
  210. // here. The first one to get the lock would got the channel to exit idle.
  211. //
  212. // Either way, nothing to do here.
  213. return nil
  214. }
  215. if err := i.enforcer.exitIdleMode(); err != nil {
  216. return fmt.Errorf("channel failed to exit idle mode: %v", err)
  217. }
  218. // Undo the idle entry process. This also respects any new RPC attempts.
  219. atomic.AddInt32(&i.activeCallsCount, math.MaxInt32)
  220. i.actuallyIdle = false
  221. // Start a new timer to fire after the configured idle timeout.
  222. i.timer = timeAfterFunc(time.Duration(i.timeout), i.handleIdleTimeout)
  223. return nil
  224. }
  225. // onCallEnd is invoked at the end of every RPC.
  226. func (i *idlenessManagerImpl) onCallEnd() {
  227. if i.isClosed() {
  228. return
  229. }
  230. // Record the time at which the most recent call finished.
  231. atomic.StoreInt64(&i.lastCallEndTime, time.Now().UnixNano())
  232. // Decrement the active calls count. This count can temporarily go negative
  233. // when the timer callback is in the process of moving the channel to idle
  234. // mode, but one or more RPCs come in and complete before the timer callback
  235. // can get done with the process of moving to idle mode.
  236. atomic.AddInt32(&i.activeCallsCount, -1)
  237. }
  238. func (i *idlenessManagerImpl) isClosed() bool {
  239. return atomic.LoadInt32(&i.closed) == 1
  240. }
  241. func (i *idlenessManagerImpl) close() {
  242. atomic.StoreInt32(&i.closed, 1)
  243. i.idleMu.Lock()
  244. i.timer.Stop()
  245. i.timer = nil
  246. i.idleMu.Unlock()
  247. }