gracefulswitch.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package gracefulswitch implements a graceful switch load balancer.
  19. package gracefulswitch
  20. import (
  21. "errors"
  22. "fmt"
  23. "sync"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/balancer/base"
  26. "google.golang.org/grpc/connectivity"
  27. "google.golang.org/grpc/resolver"
  28. )
  29. var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
  30. var _ balancer.Balancer = (*Balancer)(nil)
  31. // NewBalancer returns a graceful switch Balancer.
  32. func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
  33. return &Balancer{
  34. cc: cc,
  35. bOpts: opts,
  36. }
  37. }
  38. // Balancer is a utility to gracefully switch from one balancer to
  39. // a new balancer. It implements the balancer.Balancer interface.
  40. type Balancer struct {
  41. bOpts balancer.BuildOptions
  42. cc balancer.ClientConn
  43. // mu protects the following fields and all fields within balancerCurrent
  44. // and balancerPending. mu does not need to be held when calling into the
  45. // child balancers, as all calls into these children happen only as a direct
  46. // result of a call into the gracefulSwitchBalancer, which are also
  47. // guaranteed to be synchronous. There is one exception: an UpdateState call
  48. // from a child balancer when current and pending are populated can lead to
  49. // calling Close() on the current. To prevent that racing with an
  50. // UpdateSubConnState from the channel, we hold currentMu during Close and
  51. // UpdateSubConnState calls.
  52. mu sync.Mutex
  53. balancerCurrent *balancerWrapper
  54. balancerPending *balancerWrapper
  55. closed bool // set to true when this balancer is closed
  56. // currentMu must be locked before mu. This mutex guards against this
  57. // sequence of events: UpdateSubConnState() called, finds the
  58. // balancerCurrent, gives up lock, updateState comes in, causes Close() on
  59. // balancerCurrent before the UpdateSubConnState is called on the
  60. // balancerCurrent.
  61. currentMu sync.Mutex
  62. }
  63. // swap swaps out the current lb with the pending lb and updates the ClientConn.
  64. // The caller must hold gsb.mu.
  65. func (gsb *Balancer) swap() {
  66. gsb.cc.UpdateState(gsb.balancerPending.lastState)
  67. cur := gsb.balancerCurrent
  68. gsb.balancerCurrent = gsb.balancerPending
  69. gsb.balancerPending = nil
  70. go func() {
  71. gsb.currentMu.Lock()
  72. defer gsb.currentMu.Unlock()
  73. cur.Close()
  74. }()
  75. }
  76. // Helper function that checks if the balancer passed in is current or pending.
  77. // The caller must hold gsb.mu.
  78. func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
  79. return bw == gsb.balancerCurrent || bw == gsb.balancerPending
  80. }
  81. // SwitchTo initializes the graceful switch process, which completes based on
  82. // connectivity state changes on the current/pending balancer. Thus, the switch
  83. // process is not complete when this method returns. This method must be called
  84. // synchronously alongside the rest of the balancer.Balancer methods this
  85. // Graceful Switch Balancer implements.
  86. func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
  87. gsb.mu.Lock()
  88. if gsb.closed {
  89. gsb.mu.Unlock()
  90. return errBalancerClosed
  91. }
  92. bw := &balancerWrapper{
  93. gsb: gsb,
  94. lastState: balancer.State{
  95. ConnectivityState: connectivity.Connecting,
  96. Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
  97. },
  98. subconns: make(map[balancer.SubConn]bool),
  99. }
  100. balToClose := gsb.balancerPending // nil if there is no pending balancer
  101. if gsb.balancerCurrent == nil {
  102. gsb.balancerCurrent = bw
  103. } else {
  104. gsb.balancerPending = bw
  105. }
  106. gsb.mu.Unlock()
  107. balToClose.Close()
  108. // This function takes a builder instead of a balancer because builder.Build
  109. // can call back inline, and this utility needs to handle the callbacks.
  110. newBalancer := builder.Build(bw, gsb.bOpts)
  111. if newBalancer == nil {
  112. // This is illegal and should never happen; we clear the balancerWrapper
  113. // we were constructing if it happens to avoid a potential panic.
  114. gsb.mu.Lock()
  115. if gsb.balancerPending != nil {
  116. gsb.balancerPending = nil
  117. } else {
  118. gsb.balancerCurrent = nil
  119. }
  120. gsb.mu.Unlock()
  121. return balancer.ErrBadResolverState
  122. }
  123. // This write doesn't need to take gsb.mu because this field never gets read
  124. // or written to on any calls from the current or pending. Calls from grpc
  125. // to this balancer are guaranteed to be called synchronously, so this
  126. // bw.Balancer field will never be forwarded to until this SwitchTo()
  127. // function returns.
  128. bw.Balancer = newBalancer
  129. return nil
  130. }
  131. // Returns nil if the graceful switch balancer is closed.
  132. func (gsb *Balancer) latestBalancer() *balancerWrapper {
  133. gsb.mu.Lock()
  134. defer gsb.mu.Unlock()
  135. if gsb.balancerPending != nil {
  136. return gsb.balancerPending
  137. }
  138. return gsb.balancerCurrent
  139. }
  140. // UpdateClientConnState forwards the update to the latest balancer created.
  141. func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
  142. // The resolver data is only relevant to the most recent LB Policy.
  143. balToUpdate := gsb.latestBalancer()
  144. if balToUpdate == nil {
  145. return errBalancerClosed
  146. }
  147. // Perform this call without gsb.mu to prevent deadlocks if the child calls
  148. // back into the channel. The latest balancer can never be closed during a
  149. // call from the channel, even without gsb.mu held.
  150. return balToUpdate.UpdateClientConnState(state)
  151. }
  152. // ResolverError forwards the error to the latest balancer created.
  153. func (gsb *Balancer) ResolverError(err error) {
  154. // The resolver data is only relevant to the most recent LB Policy.
  155. balToUpdate := gsb.latestBalancer()
  156. if balToUpdate == nil {
  157. return
  158. }
  159. // Perform this call without gsb.mu to prevent deadlocks if the child calls
  160. // back into the channel. The latest balancer can never be closed during a
  161. // call from the channel, even without gsb.mu held.
  162. balToUpdate.ResolverError(err)
  163. }
  164. // ExitIdle forwards the call to the latest balancer created.
  165. //
  166. // If the latest balancer does not support ExitIdle, the subConns are
  167. // re-connected to manually.
  168. func (gsb *Balancer) ExitIdle() {
  169. balToUpdate := gsb.latestBalancer()
  170. if balToUpdate == nil {
  171. return
  172. }
  173. // There is no need to protect this read with a mutex, as the write to the
  174. // Balancer field happens in SwitchTo, which completes before this can be
  175. // called.
  176. if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
  177. ei.ExitIdle()
  178. return
  179. }
  180. gsb.mu.Lock()
  181. defer gsb.mu.Unlock()
  182. for sc := range balToUpdate.subconns {
  183. sc.Connect()
  184. }
  185. }
  186. // UpdateSubConnState forwards the update to the appropriate child.
  187. func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  188. gsb.currentMu.Lock()
  189. defer gsb.currentMu.Unlock()
  190. gsb.mu.Lock()
  191. // Forward update to the appropriate child. Even if there is a pending
  192. // balancer, the current balancer should continue to get SubConn updates to
  193. // maintain the proper state while the pending is still connecting.
  194. var balToUpdate *balancerWrapper
  195. if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
  196. balToUpdate = gsb.balancerCurrent
  197. } else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
  198. balToUpdate = gsb.balancerPending
  199. }
  200. gsb.mu.Unlock()
  201. if balToUpdate == nil {
  202. // SubConn belonged to a stale lb policy that has not yet fully closed,
  203. // or the balancer was already closed.
  204. return
  205. }
  206. balToUpdate.UpdateSubConnState(sc, state)
  207. }
  208. // Close closes any active child balancers.
  209. func (gsb *Balancer) Close() {
  210. gsb.mu.Lock()
  211. gsb.closed = true
  212. currentBalancerToClose := gsb.balancerCurrent
  213. gsb.balancerCurrent = nil
  214. pendingBalancerToClose := gsb.balancerPending
  215. gsb.balancerPending = nil
  216. gsb.mu.Unlock()
  217. currentBalancerToClose.Close()
  218. pendingBalancerToClose.Close()
  219. }
  220. // balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
  221. // methods to help cleanup SubConns created by the wrapped balancer.
  222. //
  223. // It implements the balancer.ClientConn interface and is passed down in that
  224. // capacity to the wrapped balancer. It maintains a set of subConns created by
  225. // the wrapped balancer and calls from the latter to create/update/remove
  226. // SubConns update this set before being forwarded to the parent ClientConn.
  227. // State updates from the wrapped balancer can result in invocation of the
  228. // graceful switch logic.
  229. type balancerWrapper struct {
  230. balancer.Balancer
  231. gsb *Balancer
  232. lastState balancer.State
  233. subconns map[balancer.SubConn]bool // subconns created by this balancer
  234. }
  235. func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  236. if state.ConnectivityState == connectivity.Shutdown {
  237. bw.gsb.mu.Lock()
  238. delete(bw.subconns, sc)
  239. bw.gsb.mu.Unlock()
  240. }
  241. // There is no need to protect this read with a mutex, as the write to the
  242. // Balancer field happens in SwitchTo, which completes before this can be
  243. // called.
  244. bw.Balancer.UpdateSubConnState(sc, state)
  245. }
  246. // Close closes the underlying LB policy and removes the subconns it created. bw
  247. // must not be referenced via balancerCurrent or balancerPending in gsb when
  248. // called. gsb.mu must not be held. Does not panic with a nil receiver.
  249. func (bw *balancerWrapper) Close() {
  250. // before Close is called.
  251. if bw == nil {
  252. return
  253. }
  254. // There is no need to protect this read with a mutex, as Close() is
  255. // impossible to be called concurrently with the write in SwitchTo(). The
  256. // callsites of Close() for this balancer in Graceful Switch Balancer will
  257. // never be called until SwitchTo() returns.
  258. bw.Balancer.Close()
  259. bw.gsb.mu.Lock()
  260. for sc := range bw.subconns {
  261. bw.gsb.cc.RemoveSubConn(sc)
  262. }
  263. bw.gsb.mu.Unlock()
  264. }
  265. func (bw *balancerWrapper) UpdateState(state balancer.State) {
  266. // Hold the mutex for this entire call to ensure it cannot occur
  267. // concurrently with other updateState() calls. This causes updates to
  268. // lastState and calls to cc.UpdateState to happen atomically.
  269. bw.gsb.mu.Lock()
  270. defer bw.gsb.mu.Unlock()
  271. bw.lastState = state
  272. if !bw.gsb.balancerCurrentOrPending(bw) {
  273. return
  274. }
  275. if bw == bw.gsb.balancerCurrent {
  276. // In the case that the current balancer exits READY, and there is a pending
  277. // balancer, you can forward the pending balancer's cached State up to
  278. // ClientConn and swap the pending into the current. This is because there
  279. // is no reason to gracefully switch from and keep using the old policy as
  280. // the ClientConn is not connected to any backends.
  281. if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
  282. bw.gsb.swap()
  283. return
  284. }
  285. // Even if there is a pending balancer waiting to be gracefully switched to,
  286. // continue to forward current balancer updates to the Client Conn. Ignoring
  287. // state + picker from the current would cause undefined behavior/cause the
  288. // system to behave incorrectly from the current LB policies perspective.
  289. // Also, the current LB is still being used by grpc to choose SubConns per
  290. // RPC, and thus should use the most updated form of the current balancer.
  291. bw.gsb.cc.UpdateState(state)
  292. return
  293. }
  294. // This method is now dealing with a state update from the pending balancer.
  295. // If the current balancer is currently in a state other than READY, the new
  296. // policy can be swapped into place immediately. This is because there is no
  297. // reason to gracefully switch from and keep using the old policy as the
  298. // ClientConn is not connected to any backends.
  299. if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
  300. bw.gsb.swap()
  301. }
  302. }
  303. func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
  304. bw.gsb.mu.Lock()
  305. if !bw.gsb.balancerCurrentOrPending(bw) {
  306. bw.gsb.mu.Unlock()
  307. return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
  308. }
  309. bw.gsb.mu.Unlock()
  310. sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
  311. if err != nil {
  312. return nil, err
  313. }
  314. bw.gsb.mu.Lock()
  315. if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
  316. bw.gsb.cc.RemoveSubConn(sc)
  317. bw.gsb.mu.Unlock()
  318. return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
  319. }
  320. bw.subconns[sc] = true
  321. bw.gsb.mu.Unlock()
  322. return sc, nil
  323. }
  324. func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
  325. // Ignore ResolveNow requests from anything other than the most recent
  326. // balancer, because older balancers were already removed from the config.
  327. if bw != bw.gsb.latestBalancer() {
  328. return
  329. }
  330. bw.gsb.cc.ResolveNow(opts)
  331. }
  332. func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
  333. bw.gsb.mu.Lock()
  334. if !bw.gsb.balancerCurrentOrPending(bw) {
  335. bw.gsb.mu.Unlock()
  336. return
  337. }
  338. bw.gsb.mu.Unlock()
  339. bw.gsb.cc.RemoveSubConn(sc)
  340. }
  341. func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
  342. bw.gsb.mu.Lock()
  343. if !bw.gsb.balancerCurrentOrPending(bw) {
  344. bw.gsb.mu.Unlock()
  345. return
  346. }
  347. bw.gsb.mu.Unlock()
  348. bw.gsb.cc.UpdateAddresses(sc, addrs)
  349. }
  350. func (bw *balancerWrapper) Target() string {
  351. return bw.gsb.cc.Target()
  352. }