balancergroup_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package balancergroup
  17. import (
  18. "context"
  19. "fmt"
  20. "testing"
  21. "time"
  22. "github.com/google/go-cmp/cmp"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/balancer/roundrobin"
  26. "google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
  27. "google.golang.org/grpc/connectivity"
  28. "google.golang.org/grpc/credentials/insecure"
  29. "google.golang.org/grpc/internal/balancer/stub"
  30. "google.golang.org/grpc/internal/channelz"
  31. "google.golang.org/grpc/internal/grpctest"
  32. "google.golang.org/grpc/internal/testutils"
  33. "google.golang.org/grpc/resolver"
  34. )
  35. const (
  36. defaultTestTimeout = 5 * time.Second
  37. defaultTestShortTimeout = 10 * time.Millisecond
  38. )
  39. var (
  40. rrBuilder = balancer.Get(roundrobin.Name)
  41. testBalancerIDs = []string{"b1", "b2", "b3"}
  42. testBackendAddrs []resolver.Address
  43. )
  44. const testBackendAddrsCount = 12
  45. func init() {
  46. for i := 0; i < testBackendAddrsCount; i++ {
  47. testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)})
  48. }
  49. // Disable caching for all tests. It will be re-enabled in caching specific
  50. // tests.
  51. DefaultSubBalancerCloseTimeout = time.Millisecond
  52. }
  53. type s struct {
  54. grpctest.Tester
  55. }
  56. func Test(t *testing.T) {
  57. grpctest.RunSubTests(t, s{})
  58. }
  59. // Create a new balancer group, add balancer and backends, but not start.
  60. // - b1, weight 2, backends [0,1]
  61. // - b2, weight 1, backends [2,3]
  62. // Start the balancer group and check behavior.
  63. //
  64. // Close the balancer group, call add/remove/change weight/change address.
  65. // - b2, weight 3, backends [0,3]
  66. // - b3, weight 1, backends [1,2]
  67. // Start the balancer group again and check for behavior.
  68. func (s) TestBalancerGroup_start_close(t *testing.T) {
  69. cc := testutils.NewTestClientConn(t)
  70. gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
  71. gator.Start()
  72. bg := New(cc, balancer.BuildOptions{}, gator, nil)
  73. // Add two balancers to group and send two resolved addresses to both
  74. // balancers.
  75. gator.Add(testBalancerIDs[0], 2)
  76. bg.Add(testBalancerIDs[0], rrBuilder)
  77. bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
  78. gator.Add(testBalancerIDs[1], 1)
  79. bg.Add(testBalancerIDs[1], rrBuilder)
  80. bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
  81. bg.Start()
  82. m1 := make(map[resolver.Address]balancer.SubConn)
  83. for i := 0; i < 4; i++ {
  84. addrs := <-cc.NewSubConnAddrsCh
  85. sc := <-cc.NewSubConnCh
  86. m1[addrs[0]] = sc
  87. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  88. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
  89. }
  90. // Test roundrobin on the last picker.
  91. p1 := <-cc.NewPickerCh
  92. want := []balancer.SubConn{
  93. m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
  94. m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
  95. m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
  96. }
  97. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
  98. t.Fatalf("want %v, got %v", want, err)
  99. }
  100. gator.Stop()
  101. bg.Close()
  102. for i := 0; i < 4; i++ {
  103. bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
  104. }
  105. // Add b3, weight 1, backends [1,2].
  106. gator.Add(testBalancerIDs[2], 1)
  107. bg.Add(testBalancerIDs[2], rrBuilder)
  108. bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
  109. // Remove b1.
  110. gator.Remove(testBalancerIDs[0])
  111. bg.Remove(testBalancerIDs[0])
  112. // Update b2 to weight 3, backends [0,3].
  113. gator.UpdateWeight(testBalancerIDs[1], 3)
  114. bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
  115. gator.Start()
  116. bg.Start()
  117. m2 := make(map[resolver.Address]balancer.SubConn)
  118. for i := 0; i < 4; i++ {
  119. addrs := <-cc.NewSubConnAddrsCh
  120. sc := <-cc.NewSubConnCh
  121. m2[addrs[0]] = sc
  122. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  123. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
  124. }
  125. // Test roundrobin on the last picker.
  126. p2 := <-cc.NewPickerCh
  127. want = []balancer.SubConn{
  128. m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]],
  129. m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]],
  130. m2[testBackendAddrs[1]], m2[testBackendAddrs[2]],
  131. }
  132. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
  133. t.Fatalf("want %v, got %v", want, err)
  134. }
  135. }
  136. // Test that balancer group start() doesn't deadlock if the balancer calls back
  137. // into balancer group inline when it gets an update.
  138. //
  139. // The potential deadlock can happen if we
  140. // - hold a lock and send updates to balancer (e.g. update resolved addresses)
  141. // - the balancer calls back (NewSubConn or update picker) in line
  142. //
  143. // The callback will try to hold hte same lock again, which will cause a
  144. // deadlock.
  145. //
  146. // This test starts the balancer group with a test balancer, will updates picker
  147. // whenever it gets an address update. It's expected that start() doesn't block
  148. // because of deadlock.
  149. func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
  150. const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
  151. stub.Register(balancerName, stub.BalancerFuncs{})
  152. builder := balancer.Get(balancerName)
  153. cc := testutils.NewTestClientConn(t)
  154. gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
  155. gator.Start()
  156. bg := New(cc, balancer.BuildOptions{}, gator, nil)
  157. gator.Add(testBalancerIDs[0], 2)
  158. bg.Add(testBalancerIDs[0], builder)
  159. bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
  160. gator.Add(testBalancerIDs[1], 1)
  161. bg.Add(testBalancerIDs[1], builder)
  162. bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
  163. bg.Start()
  164. }
  165. func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
  166. old := DefaultSubBalancerCloseTimeout
  167. DefaultSubBalancerCloseTimeout = n
  168. return func() { DefaultSubBalancerCloseTimeout = old }
  169. }
  170. // initBalancerGroupForCachingTest creates a balancer group, and initialize it
  171. // to be ready for caching tests.
  172. //
  173. // Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
  174. // is removed later, so the balancer group returned has one sub-balancer in its
  175. // own map, and one sub-balancer in cache.
  176. func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
  177. cc := testutils.NewTestClientConn(t)
  178. gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
  179. gator.Start()
  180. bg := New(cc, balancer.BuildOptions{}, gator, nil)
  181. // Add two balancers to group and send two resolved addresses to both
  182. // balancers.
  183. gator.Add(testBalancerIDs[0], 2)
  184. bg.Add(testBalancerIDs[0], rrBuilder)
  185. bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
  186. gator.Add(testBalancerIDs[1], 1)
  187. bg.Add(testBalancerIDs[1], rrBuilder)
  188. bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
  189. bg.Start()
  190. m1 := make(map[resolver.Address]balancer.SubConn)
  191. for i := 0; i < 4; i++ {
  192. addrs := <-cc.NewSubConnAddrsCh
  193. sc := <-cc.NewSubConnCh
  194. m1[addrs[0]] = sc
  195. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  196. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
  197. }
  198. // Test roundrobin on the last picker.
  199. p1 := <-cc.NewPickerCh
  200. want := []balancer.SubConn{
  201. m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
  202. m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
  203. m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
  204. }
  205. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
  206. t.Fatalf("want %v, got %v", want, err)
  207. }
  208. gator.Remove(testBalancerIDs[1])
  209. bg.Remove(testBalancerIDs[1])
  210. // Don't wait for SubConns to be removed after close, because they are only
  211. // removed after close timeout.
  212. for i := 0; i < 10; i++ {
  213. select {
  214. case <-cc.RemoveSubConnCh:
  215. t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)")
  216. default:
  217. }
  218. time.Sleep(time.Millisecond)
  219. }
  220. // Test roundrobin on the with only sub-balancer0.
  221. p2 := <-cc.NewPickerCh
  222. want = []balancer.SubConn{
  223. m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
  224. }
  225. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
  226. t.Fatalf("want %v, got %v", want, err)
  227. }
  228. return gator, bg, cc, m1
  229. }
  230. // Test that if a sub-balancer is removed, and re-added within close timeout,
  231. // the subConns won't be re-created.
  232. func (s) TestBalancerGroup_locality_caching(t *testing.T) {
  233. defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
  234. gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
  235. // Turn down subconn for addr2, shouldn't get picker update because
  236. // sub-balancer1 was removed.
  237. bg.UpdateSubConnState(addrToSC[testBackendAddrs[2]], balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
  238. for i := 0; i < 10; i++ {
  239. select {
  240. case <-cc.NewPickerCh:
  241. t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)")
  242. default:
  243. }
  244. time.Sleep(time.Millisecond)
  245. }
  246. // Sleep, but sleep less then close timeout.
  247. time.Sleep(time.Millisecond * 100)
  248. // Re-add sub-balancer-1, because subconns were in cache, no new subconns
  249. // should be created. But a new picker will still be generated, with subconn
  250. // states update to date.
  251. gator.Add(testBalancerIDs[1], 1)
  252. bg.Add(testBalancerIDs[1], rrBuilder)
  253. p3 := <-cc.NewPickerCh
  254. want := []balancer.SubConn{
  255. addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
  256. addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
  257. // addr2 is down, b2 only has addr3 in READY state.
  258. addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]],
  259. }
  260. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil {
  261. t.Fatalf("want %v, got %v", want, err)
  262. }
  263. for i := 0; i < 10; i++ {
  264. select {
  265. case <-cc.NewSubConnAddrsCh:
  266. t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)")
  267. default:
  268. }
  269. time.Sleep(time.Millisecond * 10)
  270. }
  271. }
  272. // Sub-balancers are put in cache when they are removed. If balancer group is
  273. // closed within close timeout, all subconns should still be rmeoved
  274. // immediately.
  275. func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
  276. defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
  277. _, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
  278. bg.Close()
  279. // The balancer group is closed. The subconns should be removed immediately.
  280. removeTimeout := time.After(time.Millisecond * 500)
  281. scToRemove := map[balancer.SubConn]int{
  282. addrToSC[testBackendAddrs[0]]: 1,
  283. addrToSC[testBackendAddrs[1]]: 1,
  284. addrToSC[testBackendAddrs[2]]: 1,
  285. addrToSC[testBackendAddrs[3]]: 1,
  286. }
  287. for i := 0; i < len(scToRemove); i++ {
  288. select {
  289. case sc := <-cc.RemoveSubConnCh:
  290. c := scToRemove[sc]
  291. if c == 0 {
  292. t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
  293. }
  294. scToRemove[sc] = c - 1
  295. case <-removeTimeout:
  296. t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
  297. }
  298. }
  299. }
  300. // Sub-balancers in cache will be closed if not re-added within timeout, and
  301. // subConns will be removed.
  302. func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
  303. defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
  304. _, _, cc, addrToSC := initBalancerGroupForCachingTest(t)
  305. // The sub-balancer is not re-added within timeout. The subconns should be
  306. // removed.
  307. removeTimeout := time.After(DefaultSubBalancerCloseTimeout)
  308. scToRemove := map[balancer.SubConn]int{
  309. addrToSC[testBackendAddrs[2]]: 1,
  310. addrToSC[testBackendAddrs[3]]: 1,
  311. }
  312. for i := 0; i < len(scToRemove); i++ {
  313. select {
  314. case sc := <-cc.RemoveSubConnCh:
  315. c := scToRemove[sc]
  316. if c == 0 {
  317. t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
  318. }
  319. scToRemove[sc] = c - 1
  320. case <-removeTimeout:
  321. t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
  322. }
  323. }
  324. }
  325. // Wrap the rr builder, so it behaves the same, but has a different name.
  326. type noopBalancerBuilderWrapper struct {
  327. balancer.Builder
  328. }
  329. func init() {
  330. balancer.Register(&noopBalancerBuilderWrapper{Builder: rrBuilder})
  331. }
  332. func (*noopBalancerBuilderWrapper) Name() string {
  333. return "noopBalancerBuilderWrapper"
  334. }
  335. // After removing a sub-balancer, re-add with same ID, but different balancer
  336. // builder. Old subconns should be removed, and new subconns should be created.
  337. func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
  338. defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
  339. gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
  340. // Re-add sub-balancer-1, but with a different balancer builder. The
  341. // sub-balancer was still in cache, but cann't be reused. This should cause
  342. // old sub-balancer's subconns to be removed immediately, and new subconns
  343. // to be created.
  344. gator.Add(testBalancerIDs[1], 1)
  345. bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
  346. // The cached sub-balancer should be closed, and the subconns should be
  347. // removed immediately.
  348. removeTimeout := time.After(time.Millisecond * 500)
  349. scToRemove := map[balancer.SubConn]int{
  350. addrToSC[testBackendAddrs[2]]: 1,
  351. addrToSC[testBackendAddrs[3]]: 1,
  352. }
  353. for i := 0; i < len(scToRemove); i++ {
  354. select {
  355. case sc := <-cc.RemoveSubConnCh:
  356. c := scToRemove[sc]
  357. if c == 0 {
  358. t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
  359. }
  360. scToRemove[sc] = c - 1
  361. case <-removeTimeout:
  362. t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
  363. }
  364. }
  365. bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}})
  366. newSCTimeout := time.After(time.Millisecond * 500)
  367. scToAdd := map[resolver.Address]int{
  368. testBackendAddrs[4]: 1,
  369. testBackendAddrs[5]: 1,
  370. }
  371. for i := 0; i < len(scToAdd); i++ {
  372. select {
  373. case addr := <-cc.NewSubConnAddrsCh:
  374. c := scToAdd[addr[0]]
  375. if c == 0 {
  376. t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c)
  377. }
  378. scToAdd[addr[0]] = c - 1
  379. sc := <-cc.NewSubConnCh
  380. addrToSC[addr[0]] = sc
  381. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  382. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
  383. case <-newSCTimeout:
  384. t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
  385. }
  386. }
  387. // Test roundrobin on the new picker.
  388. p3 := <-cc.NewPickerCh
  389. want := []balancer.SubConn{
  390. addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
  391. addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
  392. addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]],
  393. }
  394. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil {
  395. t.Fatalf("want %v, got %v", want, err)
  396. }
  397. }
  398. // After removing a sub-balancer, it will be kept in cache. Make sure that this
  399. // sub-balancer's Close is called when the balancer group is closed.
  400. func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) {
  401. const balancerName = "stub-TestBalancerGroup_check_close"
  402. closed := make(chan struct{})
  403. stub.Register(balancerName, stub.BalancerFuncs{Close: func(_ *stub.BalancerData) {
  404. close(closed)
  405. }})
  406. builder := balancer.Get(balancerName)
  407. defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
  408. gator, bg, _, _ := initBalancerGroupForCachingTest(t)
  409. // Add balancer, and remove
  410. gator.Add(testBalancerIDs[2], 1)
  411. bg.Add(testBalancerIDs[2], builder)
  412. gator.Remove(testBalancerIDs[2])
  413. bg.Remove(testBalancerIDs[2])
  414. // Immediately close balancergroup, before the cache timeout.
  415. bg.Close()
  416. // Make sure the removed child balancer is closed eventually.
  417. select {
  418. case <-closed:
  419. case <-time.After(time.Second * 2):
  420. t.Fatalf("timeout waiting for the child balancer in cache to be closed")
  421. }
  422. }
  423. // TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
  424. // to the balancergroup at creation time is passed to child policies.
  425. func (s) TestBalancerGroupBuildOptions(t *testing.T) {
  426. const (
  427. balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
  428. userAgent = "ua"
  429. )
  430. // Setup the stub balancer such that we can read the build options passed to
  431. // it in the UpdateClientConnState method.
  432. bOpts := balancer.BuildOptions{
  433. DialCreds: insecure.NewCredentials(),
  434. ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, 1234, nil),
  435. CustomUserAgent: userAgent,
  436. }
  437. stub.Register(balancerName, stub.BalancerFuncs{
  438. UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
  439. if !cmp.Equal(bd.BuildOptions, bOpts) {
  440. return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
  441. }
  442. return nil
  443. },
  444. })
  445. cc := testutils.NewTestClientConn(t)
  446. bg := New(cc, bOpts, nil, nil)
  447. bg.Start()
  448. // Add the stub balancer build above as a child policy.
  449. balancerBuilder := balancer.Get(balancerName)
  450. bg.Add(testBalancerIDs[0], balancerBuilder)
  451. // Send an empty clientConn state change. This should trigger the
  452. // verification of the buildOptions being passed to the child policy.
  453. if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
  454. t.Fatal(err)
  455. }
  456. }
  457. func (s) TestBalancerExitIdleOne(t *testing.T) {
  458. const balancerName = "stub-balancer-test-balancergroup-exit-idle-one"
  459. exitIdleCh := make(chan struct{}, 1)
  460. stub.Register(balancerName, stub.BalancerFuncs{
  461. ExitIdle: func(*stub.BalancerData) {
  462. exitIdleCh <- struct{}{}
  463. },
  464. })
  465. cc := testutils.NewTestClientConn(t)
  466. bg := New(cc, balancer.BuildOptions{}, nil, nil)
  467. bg.Start()
  468. defer bg.Close()
  469. // Add the stub balancer build above as a child policy.
  470. builder := balancer.Get(balancerName)
  471. bg.Add(testBalancerIDs[0], builder)
  472. // Call ExitIdle on the child policy.
  473. bg.ExitIdleOne(testBalancerIDs[0])
  474. select {
  475. case <-time.After(time.Second):
  476. t.Fatal("Timeout when waiting for ExitIdle to be invoked on child policy")
  477. case <-exitIdleCh:
  478. }
  479. }
  480. // TestBalancerGracefulSwitch tests the graceful switch functionality for a
  481. // child of the balancer group. At first, the child is configured as a round
  482. // robin load balancer, and thus should behave accordingly. The test then
  483. // gracefully switches this child to a custom type which only creates a SubConn
  484. // for the second passed in address and also only picks that created SubConn.
  485. // The new aggregated picker should reflect this change for the child.
  486. func (s) TestBalancerGracefulSwitch(t *testing.T) {
  487. cc := testutils.NewTestClientConn(t)
  488. gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
  489. gator.Start()
  490. bg := New(cc, balancer.BuildOptions{}, gator, nil)
  491. gator.Add(testBalancerIDs[0], 1)
  492. bg.Add(testBalancerIDs[0], rrBuilder)
  493. bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
  494. bg.Start()
  495. m1 := make(map[resolver.Address]balancer.SubConn)
  496. scs := make(map[balancer.SubConn]bool)
  497. for i := 0; i < 2; i++ {
  498. addrs := <-cc.NewSubConnAddrsCh
  499. sc := <-cc.NewSubConnCh
  500. m1[addrs[0]] = sc
  501. scs[sc] = true
  502. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  503. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
  504. }
  505. p1 := <-cc.NewPickerCh
  506. want := []balancer.SubConn{
  507. m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
  508. }
  509. if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
  510. t.Fatalf("want %v, got %v", want, err)
  511. }
  512. // The balancer type for testBalancersIDs[0] is currently Round Robin. Now,
  513. // change it to a balancer that has separate behavior logically (creating
  514. // SubConn for second address in address list and always picking that
  515. // SubConn), and see if the downstream behavior reflects that change.
  516. bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{})
  517. if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
  518. t.Fatalf("error updating ClientConn state: %v", err)
  519. }
  520. addrs := <-cc.NewSubConnAddrsCh
  521. if addrs[0].Addr != testBackendAddrs[3].Addr {
  522. // Verifies forwarded to new created balancer, as the wrapped pick first
  523. // balancer will delete first address.
  524. t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr)
  525. }
  526. sc := <-cc.NewSubConnCh
  527. // Update the pick first balancers SubConn as CONNECTING. This will cause
  528. // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
  529. // a Picker update back, as the Graceful Switch process is not complete.
  530. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  531. ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  532. defer cancel()
  533. select {
  534. case <-cc.NewPickerCh:
  535. t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
  536. case <-ctx.Done():
  537. }
  538. // Update the pick first balancers SubConn as READY. This will cause
  539. // the pick first balancer to UpdateState() with READY, which should send a
  540. // Picker update back, as the Graceful Switch process is complete. This
  541. // Picker should always pick the pick first's created SubConn which
  542. // corresponds to address 3.
  543. bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
  544. p2 := <-cc.NewPickerCh
  545. pr, err := p2.Pick(balancer.PickInfo{})
  546. if err != nil {
  547. t.Fatalf("error picking: %v", err)
  548. }
  549. if pr.SubConn != sc {
  550. t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn)
  551. }
  552. // The Graceful Switch process completing for the child should cause the
  553. // SubConns for the balancer being gracefully switched from to get deleted.
  554. ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
  555. defer cancel()
  556. for i := 0; i < 2; i++ {
  557. select {
  558. case <-ctx.Done():
  559. t.Fatalf("error waiting for RemoveSubConn()")
  560. case sc := <-cc.RemoveSubConnCh:
  561. // The SubConn removed should have been one of the two created
  562. // SubConns, and both should be deleted.
  563. if ok := scs[sc]; ok {
  564. delete(scs, sc)
  565. continue
  566. } else {
  567. t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs)
  568. }
  569. }
  570. }
  571. }
  572. type wrappedPickFirstBalancerBuilder struct{}
  573. func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  574. builder := balancer.Get(grpc.PickFirstBalancerName)
  575. wpfb := &wrappedPickFirstBalancer{
  576. ClientConn: cc,
  577. }
  578. pf := builder.Build(wpfb, opts)
  579. wpfb.Balancer = pf
  580. return wpfb
  581. }
  582. func (wrappedPickFirstBalancerBuilder) Name() string {
  583. return "wrappedPickFirstBalancer"
  584. }
  585. type wrappedPickFirstBalancer struct {
  586. balancer.Balancer
  587. balancer.ClientConn
  588. }
  589. func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
  590. s.ResolverState.Addresses = s.ResolverState.Addresses[1:]
  591. return wb.Balancer.UpdateClientConnState(s)
  592. }
  593. func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
  594. // Eat it if IDLE - allows it to switch over only on a READY SubConn.
  595. if state.ConnectivityState == connectivity.Idle {
  596. return
  597. }
  598. wb.ClientConn.UpdateState(state)
  599. }