picker_wrapper.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "io"
  22. "sync"
  23. "google.golang.org/grpc/balancer"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/internal/channelz"
  26. istatus "google.golang.org/grpc/internal/status"
  27. "google.golang.org/grpc/internal/transport"
  28. "google.golang.org/grpc/status"
  29. )
  30. // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
  31. // actions and unblock when there's a picker update.
  32. type pickerWrapper struct {
  33. mu sync.Mutex
  34. done bool
  35. idle bool
  36. blockingCh chan struct{}
  37. picker balancer.Picker
  38. }
  39. func newPickerWrapper() *pickerWrapper {
  40. return &pickerWrapper{blockingCh: make(chan struct{})}
  41. }
  42. // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
  43. func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
  44. pw.mu.Lock()
  45. if pw.done || pw.idle {
  46. // There is a small window where a picker update from the LB policy can
  47. // race with the channel going to idle mode. If the picker is idle here,
  48. // it is because the channel asked it to do so, and therefore it is sage
  49. // to ignore the update from the LB policy.
  50. pw.mu.Unlock()
  51. return
  52. }
  53. pw.picker = p
  54. // pw.blockingCh should never be nil.
  55. close(pw.blockingCh)
  56. pw.blockingCh = make(chan struct{})
  57. pw.mu.Unlock()
  58. }
  59. // doneChannelzWrapper performs the following:
  60. // - increments the calls started channelz counter
  61. // - wraps the done function in the passed in result to increment the calls
  62. // failed or calls succeeded channelz counter before invoking the actual
  63. // done function.
  64. func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
  65. ac := acbw.ac
  66. ac.incrCallsStarted()
  67. done := result.Done
  68. result.Done = func(b balancer.DoneInfo) {
  69. if b.Err != nil && b.Err != io.EOF {
  70. ac.incrCallsFailed()
  71. } else {
  72. ac.incrCallsSucceeded()
  73. }
  74. if done != nil {
  75. done(b)
  76. }
  77. }
  78. }
  79. // pick returns the transport that will be used for the RPC.
  80. // It may block in the following cases:
  81. // - there's no picker
  82. // - the current picker returns ErrNoSubConnAvailable
  83. // - the current picker returns other errors and failfast is false.
  84. // - the subConn returned by the current picker is not READY
  85. // When one of these situations happens, pick blocks until the picker gets updated.
  86. func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
  87. var ch chan struct{}
  88. var lastPickErr error
  89. for {
  90. pw.mu.Lock()
  91. if pw.done {
  92. pw.mu.Unlock()
  93. return nil, balancer.PickResult{}, ErrClientConnClosing
  94. }
  95. if pw.picker == nil {
  96. ch = pw.blockingCh
  97. }
  98. if ch == pw.blockingCh {
  99. // This could happen when either:
  100. // - pw.picker is nil (the previous if condition), or
  101. // - has called pick on the current picker.
  102. pw.mu.Unlock()
  103. select {
  104. case <-ctx.Done():
  105. var errStr string
  106. if lastPickErr != nil {
  107. errStr = "latest balancer error: " + lastPickErr.Error()
  108. } else {
  109. errStr = ctx.Err().Error()
  110. }
  111. switch ctx.Err() {
  112. case context.DeadlineExceeded:
  113. return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
  114. case context.Canceled:
  115. return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
  116. }
  117. case <-ch:
  118. }
  119. continue
  120. }
  121. ch = pw.blockingCh
  122. p := pw.picker
  123. pw.mu.Unlock()
  124. pickResult, err := p.Pick(info)
  125. if err != nil {
  126. if err == balancer.ErrNoSubConnAvailable {
  127. continue
  128. }
  129. if st, ok := status.FromError(err); ok {
  130. // Status error: end the RPC unconditionally with this status.
  131. // First restrict the code to the list allowed by gRFC A54.
  132. if istatus.IsRestrictedControlPlaneCode(st) {
  133. err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
  134. }
  135. return nil, balancer.PickResult{}, dropError{error: err}
  136. }
  137. // For all other errors, wait for ready RPCs should block and other
  138. // RPCs should fail with unavailable.
  139. if !failfast {
  140. lastPickErr = err
  141. continue
  142. }
  143. return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
  144. }
  145. acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
  146. if !ok {
  147. logger.Errorf("subconn returned from pick is type %T, not *acBalancerWrapper", pickResult.SubConn)
  148. continue
  149. }
  150. if t := acbw.ac.getReadyTransport(); t != nil {
  151. if channelz.IsOn() {
  152. doneChannelzWrapper(acbw, &pickResult)
  153. return t, pickResult, nil
  154. }
  155. return t, pickResult, nil
  156. }
  157. if pickResult.Done != nil {
  158. // Calling done with nil error, no bytes sent and no bytes received.
  159. // DoneInfo with default value works.
  160. pickResult.Done(balancer.DoneInfo{})
  161. }
  162. logger.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
  163. // If ok == false, ac.state is not READY.
  164. // A valid picker always returns READY subConn. This means the state of ac
  165. // just changed, and picker will be updated shortly.
  166. // continue back to the beginning of the for loop to repick.
  167. }
  168. }
  169. func (pw *pickerWrapper) close() {
  170. pw.mu.Lock()
  171. defer pw.mu.Unlock()
  172. if pw.done {
  173. return
  174. }
  175. pw.done = true
  176. close(pw.blockingCh)
  177. }
  178. func (pw *pickerWrapper) enterIdleMode() {
  179. pw.mu.Lock()
  180. defer pw.mu.Unlock()
  181. if pw.done {
  182. return
  183. }
  184. pw.idle = true
  185. }
  186. func (pw *pickerWrapper) exitIdleMode() {
  187. pw.mu.Lock()
  188. defer pw.mu.Unlock()
  189. if pw.done {
  190. return
  191. }
  192. pw.blockingCh = make(chan struct{})
  193. pw.idle = false
  194. }
  195. // dropError is a wrapper error that indicates the LB policy wishes to drop the
  196. // RPC and not retry it.
  197. type dropError struct {
  198. error
  199. }