resolver_conn_wrapper.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "strings"
  22. "sync"
  23. "google.golang.org/grpc/balancer"
  24. "google.golang.org/grpc/internal/channelz"
  25. "google.golang.org/grpc/internal/grpcsync"
  26. "google.golang.org/grpc/internal/pretty"
  27. "google.golang.org/grpc/resolver"
  28. "google.golang.org/grpc/serviceconfig"
  29. )
  30. // resolverStateUpdater wraps the single method used by ccResolverWrapper to
  31. // report a state update from the actual resolver implementation.
  32. type resolverStateUpdater interface {
  33. updateResolverState(s resolver.State, err error) error
  34. }
  35. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  36. // It implements resolver.ClientConn interface.
  37. type ccResolverWrapper struct {
  38. // The following fields are initialized when the wrapper is created and are
  39. // read-only afterwards, and therefore can be accessed without a mutex.
  40. cc resolverStateUpdater
  41. channelzID *channelz.Identifier
  42. ignoreServiceConfig bool
  43. opts ccResolverWrapperOpts
  44. serializer *grpcsync.CallbackSerializer // To serialize all incoming calls.
  45. serializerCancel context.CancelFunc // To close the serializer, accessed only from close().
  46. // All incoming (resolver --> gRPC) calls are guaranteed to execute in a
  47. // mutually exclusive manner as they are scheduled on the serializer.
  48. // Fields accessed *only* in these serializer callbacks, can therefore be
  49. // accessed without a mutex.
  50. curState resolver.State
  51. // mu guards access to the below fields.
  52. mu sync.Mutex
  53. closed bool
  54. resolver resolver.Resolver // Accessed only from outgoing calls.
  55. }
  56. // ccResolverWrapperOpts wraps the arguments to be passed when creating a new
  57. // ccResolverWrapper.
  58. type ccResolverWrapperOpts struct {
  59. target resolver.Target // User specified dial target to resolve.
  60. builder resolver.Builder // Resolver builder to use.
  61. bOpts resolver.BuildOptions // Resolver build options to use.
  62. channelzID *channelz.Identifier // Channelz identifier for the channel.
  63. }
  64. // newCCResolverWrapper uses the resolver.Builder to build a Resolver and
  65. // returns a ccResolverWrapper object which wraps the newly built resolver.
  66. func newCCResolverWrapper(cc resolverStateUpdater, opts ccResolverWrapperOpts) (*ccResolverWrapper, error) {
  67. ctx, cancel := context.WithCancel(context.Background())
  68. ccr := &ccResolverWrapper{
  69. cc: cc,
  70. channelzID: opts.channelzID,
  71. ignoreServiceConfig: opts.bOpts.DisableServiceConfig,
  72. opts: opts,
  73. serializer: grpcsync.NewCallbackSerializer(ctx),
  74. serializerCancel: cancel,
  75. }
  76. // Cannot hold the lock at build time because the resolver can send an
  77. // update or error inline and these incoming calls grab the lock to schedule
  78. // a callback in the serializer.
  79. r, err := opts.builder.Build(opts.target, ccr, opts.bOpts)
  80. if err != nil {
  81. cancel()
  82. return nil, err
  83. }
  84. // Any error reported by the resolver at build time that leads to a
  85. // re-resolution request from the balancer is dropped by grpc until we
  86. // return from this function. So, we don't have to handle pending resolveNow
  87. // requests here.
  88. ccr.mu.Lock()
  89. ccr.resolver = r
  90. ccr.mu.Unlock()
  91. return ccr, nil
  92. }
  93. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  94. ccr.mu.Lock()
  95. defer ccr.mu.Unlock()
  96. // ccr.resolver field is set only after the call to Build() returns. But in
  97. // the process of building, the resolver may send an error update which when
  98. // propagated to the balancer may result in a re-resolution request.
  99. if ccr.closed || ccr.resolver == nil {
  100. return
  101. }
  102. ccr.resolver.ResolveNow(o)
  103. }
  104. func (ccr *ccResolverWrapper) close() {
  105. ccr.mu.Lock()
  106. if ccr.closed {
  107. ccr.mu.Unlock()
  108. return
  109. }
  110. channelz.Info(logger, ccr.channelzID, "Closing the name resolver")
  111. // Close the serializer to ensure that no more calls from the resolver are
  112. // handled, before actually closing the resolver.
  113. ccr.serializerCancel()
  114. ccr.closed = true
  115. r := ccr.resolver
  116. ccr.mu.Unlock()
  117. // Give enqueued callbacks a chance to finish.
  118. <-ccr.serializer.Done
  119. // Spawn a goroutine to close the resolver (since it may block trying to
  120. // cleanup all allocated resources) and return early.
  121. go r.Close()
  122. }
  123. // serializerScheduleLocked is a convenience method to schedule a function to be
  124. // run on the serializer while holding ccr.mu.
  125. func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context)) {
  126. ccr.mu.Lock()
  127. ccr.serializer.Schedule(f)
  128. ccr.mu.Unlock()
  129. }
  130. // UpdateState is called by resolver implementations to report new state to gRPC
  131. // which includes addresses and service config.
  132. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
  133. errCh := make(chan error, 1)
  134. ok := ccr.serializer.Schedule(func(context.Context) {
  135. ccr.addChannelzTraceEvent(s)
  136. ccr.curState = s
  137. if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
  138. errCh <- balancer.ErrBadResolverState
  139. return
  140. }
  141. errCh <- nil
  142. })
  143. if !ok {
  144. // The only time when Schedule() fail to add the callback to the
  145. // serializer is when the serializer is closed, and this happens only
  146. // when the resolver wrapper is closed.
  147. return nil
  148. }
  149. return <-errCh
  150. }
  151. // ReportError is called by resolver implementations to report errors
  152. // encountered during name resolution to gRPC.
  153. func (ccr *ccResolverWrapper) ReportError(err error) {
  154. ccr.serializerScheduleLocked(func(_ context.Context) {
  155. channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
  156. ccr.cc.updateResolverState(resolver.State{}, err)
  157. })
  158. }
  159. // NewAddress is called by the resolver implementation to send addresses to
  160. // gRPC.
  161. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  162. ccr.serializerScheduleLocked(func(_ context.Context) {
  163. ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
  164. ccr.curState.Addresses = addrs
  165. ccr.cc.updateResolverState(ccr.curState, nil)
  166. })
  167. }
  168. // NewServiceConfig is called by the resolver implementation to send service
  169. // configs to gRPC.
  170. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  171. ccr.serializerScheduleLocked(func(_ context.Context) {
  172. channelz.Infof(logger, ccr.channelzID, "ccResolverWrapper: got new service config: %s", sc)
  173. if ccr.ignoreServiceConfig {
  174. channelz.Info(logger, ccr.channelzID, "Service config lookups disabled; ignoring config")
  175. return
  176. }
  177. scpr := parseServiceConfig(sc)
  178. if scpr.Err != nil {
  179. channelz.Warningf(logger, ccr.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
  180. return
  181. }
  182. ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
  183. ccr.curState.ServiceConfig = scpr
  184. ccr.cc.updateResolverState(ccr.curState, nil)
  185. })
  186. }
  187. // ParseServiceConfig is called by resolver implementations to parse a JSON
  188. // representation of the service config.
  189. func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
  190. return parseServiceConfig(scJSON)
  191. }
  192. // addChannelzTraceEvent adds a channelz trace event containing the new
  193. // state received from resolver implementations.
  194. func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
  195. var updates []string
  196. var oldSC, newSC *ServiceConfig
  197. var oldOK, newOK bool
  198. if ccr.curState.ServiceConfig != nil {
  199. oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
  200. }
  201. if s.ServiceConfig != nil {
  202. newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
  203. }
  204. if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
  205. updates = append(updates, "service config updated")
  206. }
  207. if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
  208. updates = append(updates, "resolver returned an empty address list")
  209. } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
  210. updates = append(updates, "resolver returned new addresses")
  211. }
  212. channelz.Infof(logger, ccr.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
  213. }