service_test.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package service
  19. import (
  20. "context"
  21. "fmt"
  22. "net"
  23. "strconv"
  24. "strings"
  25. "testing"
  26. "time"
  27. "github.com/golang/protobuf/proto"
  28. "github.com/golang/protobuf/ptypes"
  29. "github.com/google/go-cmp/cmp"
  30. "google.golang.org/grpc/connectivity"
  31. "google.golang.org/grpc/credentials"
  32. "google.golang.org/grpc/internal/channelz"
  33. "google.golang.org/grpc/internal/grpctest"
  34. "google.golang.org/protobuf/testing/protocmp"
  35. channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
  36. )
  37. func init() {
  38. channelz.TurnOn()
  39. }
  40. type s struct {
  41. grpctest.Tester
  42. }
  43. func Test(t *testing.T) {
  44. grpctest.RunSubTests(t, s{})
  45. }
  46. func cleanupWrapper(cleanup func() error, t *testing.T) {
  47. if err := cleanup(); err != nil {
  48. t.Error(err)
  49. }
  50. }
  51. type protoToSocketOptFunc func([]*channelzpb.SocketOption) *channelz.SocketOptionData
  52. // protoToSocketOpt is used in function socketProtoToStruct to extract socket option
  53. // data from unmarshaled proto message.
  54. // It is only defined under linux environment on x86 architecture.
  55. var protoToSocketOpt protoToSocketOptFunc
  56. const defaultTestTimeout = 10 * time.Second
  57. type dummyChannel struct {
  58. state connectivity.State
  59. target string
  60. callsStarted int64
  61. callsSucceeded int64
  62. callsFailed int64
  63. lastCallStartedTimestamp time.Time
  64. }
  65. func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
  66. return &channelz.ChannelInternalMetric{
  67. State: d.state,
  68. Target: d.target,
  69. CallsStarted: d.callsStarted,
  70. CallsSucceeded: d.callsSucceeded,
  71. CallsFailed: d.callsFailed,
  72. LastCallStartedTimestamp: d.lastCallStartedTimestamp,
  73. }
  74. }
  75. type dummyServer struct {
  76. callsStarted int64
  77. callsSucceeded int64
  78. callsFailed int64
  79. lastCallStartedTimestamp time.Time
  80. }
  81. func (d *dummyServer) ChannelzMetric() *channelz.ServerInternalMetric {
  82. return &channelz.ServerInternalMetric{
  83. CallsStarted: d.callsStarted,
  84. CallsSucceeded: d.callsSucceeded,
  85. CallsFailed: d.callsFailed,
  86. LastCallStartedTimestamp: d.lastCallStartedTimestamp,
  87. }
  88. }
  89. type dummySocket struct {
  90. streamsStarted int64
  91. streamsSucceeded int64
  92. streamsFailed int64
  93. messagesSent int64
  94. messagesReceived int64
  95. keepAlivesSent int64
  96. lastLocalStreamCreatedTimestamp time.Time
  97. lastRemoteStreamCreatedTimestamp time.Time
  98. lastMessageSentTimestamp time.Time
  99. lastMessageReceivedTimestamp time.Time
  100. localFlowControlWindow int64
  101. remoteFlowControlWindow int64
  102. socketOptions *channelz.SocketOptionData
  103. localAddr net.Addr
  104. remoteAddr net.Addr
  105. security credentials.ChannelzSecurityValue
  106. remoteName string
  107. }
  108. func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric {
  109. return &channelz.SocketInternalMetric{
  110. StreamsStarted: d.streamsStarted,
  111. StreamsSucceeded: d.streamsSucceeded,
  112. StreamsFailed: d.streamsFailed,
  113. MessagesSent: d.messagesSent,
  114. MessagesReceived: d.messagesReceived,
  115. KeepAlivesSent: d.keepAlivesSent,
  116. LastLocalStreamCreatedTimestamp: d.lastLocalStreamCreatedTimestamp,
  117. LastRemoteStreamCreatedTimestamp: d.lastRemoteStreamCreatedTimestamp,
  118. LastMessageSentTimestamp: d.lastMessageSentTimestamp,
  119. LastMessageReceivedTimestamp: d.lastMessageReceivedTimestamp,
  120. LocalFlowControlWindow: d.localFlowControlWindow,
  121. RemoteFlowControlWindow: d.remoteFlowControlWindow,
  122. SocketOptions: d.socketOptions,
  123. LocalAddr: d.localAddr,
  124. RemoteAddr: d.remoteAddr,
  125. Security: d.security,
  126. RemoteName: d.remoteName,
  127. }
  128. }
  129. func channelProtoToStruct(c *channelzpb.Channel) (*dummyChannel, error) {
  130. dc := &dummyChannel{}
  131. pdata := c.GetData()
  132. switch pdata.GetState().GetState() {
  133. case channelzpb.ChannelConnectivityState_UNKNOWN:
  134. // TODO: what should we set here?
  135. case channelzpb.ChannelConnectivityState_IDLE:
  136. dc.state = connectivity.Idle
  137. case channelzpb.ChannelConnectivityState_CONNECTING:
  138. dc.state = connectivity.Connecting
  139. case channelzpb.ChannelConnectivityState_READY:
  140. dc.state = connectivity.Ready
  141. case channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE:
  142. dc.state = connectivity.TransientFailure
  143. case channelzpb.ChannelConnectivityState_SHUTDOWN:
  144. dc.state = connectivity.Shutdown
  145. }
  146. dc.target = pdata.GetTarget()
  147. dc.callsStarted = pdata.CallsStarted
  148. dc.callsSucceeded = pdata.CallsSucceeded
  149. dc.callsFailed = pdata.CallsFailed
  150. if err := pdata.GetLastCallStartedTimestamp().CheckValid(); err != nil {
  151. return nil, err
  152. }
  153. dc.lastCallStartedTimestamp = pdata.GetLastCallStartedTimestamp().AsTime()
  154. return dc, nil
  155. }
  156. func serverProtoToStruct(s *channelzpb.Server) (*dummyServer, error) {
  157. ds := &dummyServer{}
  158. pdata := s.GetData()
  159. ds.callsStarted = pdata.CallsStarted
  160. ds.callsSucceeded = pdata.CallsSucceeded
  161. ds.callsFailed = pdata.CallsFailed
  162. if err := pdata.GetLastCallStartedTimestamp().CheckValid(); err != nil {
  163. return nil, err
  164. }
  165. ds.lastCallStartedTimestamp = pdata.GetLastCallStartedTimestamp().AsTime()
  166. return ds, nil
  167. }
  168. func socketProtoToStruct(s *channelzpb.Socket) (*dummySocket, error) {
  169. ds := &dummySocket{}
  170. pdata := s.GetData()
  171. ds.streamsStarted = pdata.GetStreamsStarted()
  172. ds.streamsSucceeded = pdata.GetStreamsSucceeded()
  173. ds.streamsFailed = pdata.GetStreamsFailed()
  174. ds.messagesSent = pdata.GetMessagesSent()
  175. ds.messagesReceived = pdata.GetMessagesReceived()
  176. ds.keepAlivesSent = pdata.GetKeepAlivesSent()
  177. if err := pdata.GetLastLocalStreamCreatedTimestamp().CheckValid(); err != nil {
  178. return nil, err
  179. }
  180. ds.lastLocalStreamCreatedTimestamp = pdata.GetLastLocalStreamCreatedTimestamp().AsTime()
  181. if err := pdata.GetLastRemoteStreamCreatedTimestamp().CheckValid(); err != nil {
  182. return nil, err
  183. }
  184. ds.lastRemoteStreamCreatedTimestamp = pdata.GetLastRemoteStreamCreatedTimestamp().AsTime()
  185. if err := pdata.GetLastMessageSentTimestamp().CheckValid(); err != nil {
  186. return nil, err
  187. }
  188. ds.lastMessageSentTimestamp = pdata.GetLastMessageSentTimestamp().AsTime()
  189. if err := pdata.GetLastMessageReceivedTimestamp().CheckValid(); err != nil {
  190. return nil, err
  191. }
  192. ds.lastMessageReceivedTimestamp = pdata.GetLastMessageReceivedTimestamp().AsTime()
  193. if v := pdata.GetLocalFlowControlWindow(); v != nil {
  194. ds.localFlowControlWindow = v.Value
  195. }
  196. if v := pdata.GetRemoteFlowControlWindow(); v != nil {
  197. ds.remoteFlowControlWindow = v.Value
  198. }
  199. if v := pdata.GetOption(); v != nil && protoToSocketOpt != nil {
  200. ds.socketOptions = protoToSocketOpt(v)
  201. }
  202. if v := s.GetSecurity(); v != nil {
  203. ds.security = protoToSecurity(v)
  204. }
  205. if local := s.GetLocal(); local != nil {
  206. ds.localAddr = protoToAddr(local)
  207. }
  208. if remote := s.GetRemote(); remote != nil {
  209. ds.remoteAddr = protoToAddr(remote)
  210. }
  211. ds.remoteName = s.GetRemoteName()
  212. return ds, nil
  213. }
  214. func protoToSecurity(protoSecurity *channelzpb.Security) credentials.ChannelzSecurityValue {
  215. switch v := protoSecurity.Model.(type) {
  216. case *channelzpb.Security_Tls_:
  217. return &credentials.TLSChannelzSecurityValue{StandardName: v.Tls.GetStandardName(), LocalCertificate: v.Tls.GetLocalCertificate(), RemoteCertificate: v.Tls.GetRemoteCertificate()}
  218. case *channelzpb.Security_Other:
  219. sv := &credentials.OtherChannelzSecurityValue{Name: v.Other.GetName()}
  220. var x ptypes.DynamicAny
  221. if err := ptypes.UnmarshalAny(v.Other.GetValue(), &x); err == nil {
  222. sv.Value = x.Message
  223. }
  224. return sv
  225. }
  226. return nil
  227. }
  228. func protoToAddr(a *channelzpb.Address) net.Addr {
  229. switch v := a.Address.(type) {
  230. case *channelzpb.Address_TcpipAddress:
  231. if port := v.TcpipAddress.GetPort(); port != 0 {
  232. return &net.TCPAddr{IP: v.TcpipAddress.GetIpAddress(), Port: int(port)}
  233. }
  234. return &net.IPAddr{IP: v.TcpipAddress.GetIpAddress()}
  235. case *channelzpb.Address_UdsAddress_:
  236. return &net.UnixAddr{Name: v.UdsAddress.GetFilename(), Net: "unix"}
  237. case *channelzpb.Address_OtherAddress_:
  238. // TODO:
  239. }
  240. return nil
  241. }
  242. func convertSocketRefSliceToMap(sktRefs []*channelzpb.SocketRef) map[int64]string {
  243. m := make(map[int64]string)
  244. for _, sr := range sktRefs {
  245. m[sr.SocketId] = sr.Name
  246. }
  247. return m
  248. }
  249. type OtherSecurityValue struct {
  250. LocalCertificate []byte `protobuf:"bytes,1,opt,name=local_certificate,json=localCertificate,proto3" json:"local_certificate,omitempty"`
  251. RemoteCertificate []byte `protobuf:"bytes,2,opt,name=remote_certificate,json=remoteCertificate,proto3" json:"remote_certificate,omitempty"`
  252. }
  253. func (m *OtherSecurityValue) Reset() { *m = OtherSecurityValue{} }
  254. func (m *OtherSecurityValue) String() string { return proto.CompactTextString(m) }
  255. func (*OtherSecurityValue) ProtoMessage() {}
  256. func init() {
  257. // Ad-hoc registering the proto type here to facilitate UnmarshalAny of OtherSecurityValue.
  258. proto.RegisterType((*OtherSecurityValue)(nil), "grpc.credentials.OtherChannelzSecurityValue")
  259. }
  260. func (s) TestGetTopChannels(t *testing.T) {
  261. tcs := []*dummyChannel{
  262. {
  263. state: connectivity.Connecting,
  264. target: "test.channelz:1234",
  265. callsStarted: 6,
  266. callsSucceeded: 2,
  267. callsFailed: 3,
  268. lastCallStartedTimestamp: time.Now().UTC(),
  269. },
  270. {
  271. state: connectivity.Connecting,
  272. target: "test.channelz:1234",
  273. callsStarted: 1,
  274. callsSucceeded: 2,
  275. callsFailed: 3,
  276. lastCallStartedTimestamp: time.Now().UTC(),
  277. },
  278. {
  279. state: connectivity.Shutdown,
  280. target: "test.channelz:8888",
  281. callsStarted: 0,
  282. callsSucceeded: 0,
  283. callsFailed: 0,
  284. },
  285. {},
  286. }
  287. czCleanup := channelz.NewChannelzStorageForTesting()
  288. defer cleanupWrapper(czCleanup, t)
  289. for _, c := range tcs {
  290. id := channelz.RegisterChannel(c, nil, "")
  291. defer channelz.RemoveEntry(id)
  292. }
  293. s := newCZServer()
  294. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  295. defer cancel()
  296. resp, _ := s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
  297. if !resp.GetEnd() {
  298. t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
  299. }
  300. for i, c := range resp.GetChannel() {
  301. channel, err := channelProtoToStruct(c)
  302. if err != nil {
  303. t.Fatal(err)
  304. }
  305. if diff := cmp.Diff(tcs[i], channel, protocmp.Transform(), cmp.AllowUnexported(dummyChannel{})); diff != "" {
  306. t.Fatalf("unexpected channel, diff (-want +got):\n%s", diff)
  307. }
  308. }
  309. for i := 0; i < 50; i++ {
  310. id := channelz.RegisterChannel(tcs[0], nil, "")
  311. defer channelz.RemoveEntry(id)
  312. }
  313. resp, _ = s.GetTopChannels(ctx, &channelzpb.GetTopChannelsRequest{StartChannelId: 0})
  314. if resp.GetEnd() {
  315. t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
  316. }
  317. }
  318. func (s) TestGetServers(t *testing.T) {
  319. ss := []*dummyServer{
  320. {
  321. callsStarted: 6,
  322. callsSucceeded: 2,
  323. callsFailed: 3,
  324. lastCallStartedTimestamp: time.Now().UTC(),
  325. },
  326. {
  327. callsStarted: 1,
  328. callsSucceeded: 2,
  329. callsFailed: 3,
  330. lastCallStartedTimestamp: time.Now().UTC(),
  331. },
  332. {
  333. callsStarted: 1,
  334. callsSucceeded: 0,
  335. callsFailed: 0,
  336. lastCallStartedTimestamp: time.Now().UTC(),
  337. },
  338. }
  339. czCleanup := channelz.NewChannelzStorageForTesting()
  340. defer cleanupWrapper(czCleanup, t)
  341. for _, s := range ss {
  342. id := channelz.RegisterServer(s, "")
  343. defer channelz.RemoveEntry(id)
  344. }
  345. svr := newCZServer()
  346. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  347. defer cancel()
  348. resp, _ := svr.GetServers(ctx, &channelzpb.GetServersRequest{StartServerId: 0})
  349. if !resp.GetEnd() {
  350. t.Fatalf("resp.GetEnd() want true, got %v", resp.GetEnd())
  351. }
  352. for i, s := range resp.GetServer() {
  353. server, err := serverProtoToStruct(s)
  354. if err != nil {
  355. t.Fatal(err)
  356. }
  357. if diff := cmp.Diff(ss[i], server, protocmp.Transform(), cmp.AllowUnexported(dummyServer{})); diff != "" {
  358. t.Fatalf("unexpected server, diff (-want +got):\n%s", diff)
  359. }
  360. }
  361. for i := 0; i < 50; i++ {
  362. id := channelz.RegisterServer(ss[0], "")
  363. defer channelz.RemoveEntry(id)
  364. }
  365. resp, _ = svr.GetServers(ctx, &channelzpb.GetServersRequest{StartServerId: 0})
  366. if resp.GetEnd() {
  367. t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
  368. }
  369. }
  370. func (s) TestGetServerSockets(t *testing.T) {
  371. czCleanup := channelz.NewChannelzStorageForTesting()
  372. defer cleanupWrapper(czCleanup, t)
  373. svrID := channelz.RegisterServer(&dummyServer{}, "")
  374. defer channelz.RemoveEntry(svrID)
  375. refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
  376. ids := make([]*channelz.Identifier, 3)
  377. ids[0], _ = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
  378. ids[1], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
  379. ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
  380. for _, id := range ids {
  381. defer channelz.RemoveEntry(id)
  382. }
  383. svr := newCZServer()
  384. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  385. defer cancel()
  386. resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: 0})
  387. if !resp.GetEnd() {
  388. t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
  389. }
  390. // GetServerSockets only return normal sockets.
  391. want := map[int64]string{
  392. ids[1].Int(): refNames[1],
  393. ids[2].Int(): refNames[2],
  394. }
  395. if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
  396. t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef())
  397. }
  398. for i := 0; i < 50; i++ {
  399. id, _ := channelz.RegisterNormalSocket(&dummySocket{}, svrID, "")
  400. defer channelz.RemoveEntry(id)
  401. }
  402. resp, _ = svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: 0})
  403. if resp.GetEnd() {
  404. t.Fatalf("resp.GetEnd() want false, got %v", resp.GetEnd())
  405. }
  406. }
  407. // This test makes a GetServerSockets with a non-zero start ID, and expect only
  408. // sockets with ID >= the given start ID.
  409. func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) {
  410. czCleanup := channelz.NewChannelzStorageForTesting()
  411. defer cleanupWrapper(czCleanup, t)
  412. svrID := channelz.RegisterServer(&dummyServer{}, "")
  413. defer channelz.RemoveEntry(svrID)
  414. refNames := []string{"listen socket 1", "normal socket 1", "normal socket 2"}
  415. ids := make([]*channelz.Identifier, 3)
  416. ids[0], _ = channelz.RegisterListenSocket(&dummySocket{}, svrID, refNames[0])
  417. ids[1], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[1])
  418. ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, svrID, refNames[2])
  419. for _, id := range ids {
  420. defer channelz.RemoveEntry(id)
  421. }
  422. svr := newCZServer()
  423. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  424. defer cancel()
  425. // Make GetServerSockets with startID = ids[1]+1, so socket-1 won't be
  426. // included in the response.
  427. resp, _ := svr.GetServerSockets(ctx, &channelzpb.GetServerSocketsRequest{ServerId: svrID.Int(), StartSocketId: ids[1].Int() + 1})
  428. if !resp.GetEnd() {
  429. t.Fatalf("resp.GetEnd() want: true, got: %v", resp.GetEnd())
  430. }
  431. // GetServerSockets only return normal socket-2, socket-1 should be
  432. // filtered by start ID.
  433. want := map[int64]string{
  434. ids[2].Int(): refNames[2],
  435. }
  436. if !cmp.Equal(convertSocketRefSliceToMap(resp.GetSocketRef()), want) {
  437. t.Fatalf("GetServerSockets want: %#v, got: %#v", want, resp.GetSocketRef())
  438. }
  439. }
  440. func (s) TestGetChannel(t *testing.T) {
  441. czCleanup := channelz.NewChannelzStorageForTesting()
  442. defer cleanupWrapper(czCleanup, t)
  443. refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"}
  444. ids := make([]*channelz.Identifier, 4)
  445. ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
  446. channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
  447. Desc: "Channel Created",
  448. Severity: channelz.CtInfo,
  449. })
  450. ids[1] = channelz.RegisterChannel(&dummyChannel{}, ids[0], refNames[1])
  451. channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
  452. Desc: "Channel Created",
  453. Severity: channelz.CtInfo,
  454. Parent: &channelz.TraceEventDesc{
  455. Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1].Int()),
  456. Severity: channelz.CtInfo,
  457. },
  458. })
  459. var err error
  460. ids[2], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[2])
  461. if err != nil {
  462. t.Fatalf("channelz.RegisterSubChannel() failed: %v", err)
  463. }
  464. channelz.AddTraceEvent(logger, ids[2], 0, &channelz.TraceEventDesc{
  465. Desc: "SubChannel Created",
  466. Severity: channelz.CtInfo,
  467. Parent: &channelz.TraceEventDesc{
  468. Desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2].Int()),
  469. Severity: channelz.CtInfo,
  470. },
  471. })
  472. ids[3] = channelz.RegisterChannel(&dummyChannel{}, ids[1], refNames[3])
  473. channelz.AddTraceEvent(logger, ids[3], 0, &channelz.TraceEventDesc{
  474. Desc: "Channel Created",
  475. Severity: channelz.CtInfo,
  476. Parent: &channelz.TraceEventDesc{
  477. Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[3].Int()),
  478. Severity: channelz.CtInfo,
  479. },
  480. })
  481. channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
  482. Desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready),
  483. Severity: channelz.CtInfo,
  484. })
  485. channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
  486. Desc: "Resolver returns an empty address list",
  487. Severity: channelz.CtWarning,
  488. })
  489. for _, id := range ids {
  490. defer channelz.RemoveEntry(id)
  491. }
  492. svr := newCZServer()
  493. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  494. defer cancel()
  495. resp, _ := svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[0].Int()})
  496. metrics := resp.GetChannel()
  497. subChans := metrics.GetSubchannelRef()
  498. if len(subChans) != 1 || subChans[0].GetName() != refNames[2] || subChans[0].GetSubchannelId() != ids[2].Int() {
  499. t.Fatalf("metrics.GetSubChannelRef() want %#v, got %#v", []*channelzpb.SubchannelRef{{SubchannelId: ids[2].Int(), Name: refNames[2]}}, subChans)
  500. }
  501. nestedChans := metrics.GetChannelRef()
  502. if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[1] || nestedChans[0].GetChannelId() != ids[1].Int() {
  503. t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[1].Int(), Name: refNames[1]}}, nestedChans)
  504. }
  505. trace := metrics.GetData().GetTrace()
  506. want := []struct {
  507. desc string
  508. severity channelzpb.ChannelTraceEvent_Severity
  509. childID int64
  510. childRef string
  511. }{
  512. {desc: "Channel Created", severity: channelzpb.ChannelTraceEvent_CT_INFO},
  513. {desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[1].Int()), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[1].Int(), childRef: refNames[1]},
  514. {desc: fmt.Sprintf("SubChannel(id:%d) created", ids[2].Int()), severity: channelzpb.ChannelTraceEvent_CT_INFO, childID: ids[2].Int(), childRef: refNames[2]},
  515. {desc: fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready), severity: channelzpb.ChannelTraceEvent_CT_INFO},
  516. {desc: "Resolver returns an empty address list", severity: channelzpb.ChannelTraceEvent_CT_WARNING},
  517. }
  518. for i, e := range trace.Events {
  519. if !strings.Contains(e.GetDescription(), want[i].desc) {
  520. t.Fatalf("trace: GetDescription want %#v, got %#v", want[i].desc, e.GetDescription())
  521. }
  522. if e.GetSeverity() != want[i].severity {
  523. t.Fatalf("trace: GetSeverity want %#v, got %#v", want[i].severity, e.GetSeverity())
  524. }
  525. if want[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) {
  526. t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference")
  527. }
  528. if e.GetChannelRef().GetChannelId() != want[i].childID || e.GetChannelRef().GetName() != want[i].childRef {
  529. if e.GetSubchannelRef().GetSubchannelId() != want[i].childID || e.GetSubchannelRef().GetName() != want[i].childRef {
  530. t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", want[i].childID, want[i].childRef, e.GetChannelRef(), e.GetSubchannelRef())
  531. }
  532. }
  533. }
  534. resp, _ = svr.GetChannel(ctx, &channelzpb.GetChannelRequest{ChannelId: ids[1].Int()})
  535. metrics = resp.GetChannel()
  536. nestedChans = metrics.GetChannelRef()
  537. if len(nestedChans) != 1 || nestedChans[0].GetName() != refNames[3] || nestedChans[0].GetChannelId() != ids[3].Int() {
  538. t.Fatalf("metrics.GetChannelRef() want %#v, got %#v", []*channelzpb.ChannelRef{{ChannelId: ids[3].Int(), Name: refNames[3]}}, nestedChans)
  539. }
  540. }
  541. func (s) TestGetSubChannel(t *testing.T) {
  542. var (
  543. subchanCreated = "SubChannel Created"
  544. subchanConnectivityChange = fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)
  545. subChanPickNewAddress = fmt.Sprintf("Subchannel picks a new address %q to connect", "0.0.0.0")
  546. )
  547. czCleanup := channelz.NewChannelzStorageForTesting()
  548. defer cleanupWrapper(czCleanup, t)
  549. refNames := []string{"top channel 1", "sub channel 1", "socket 1", "socket 2"}
  550. ids := make([]*channelz.Identifier, 4)
  551. ids[0] = channelz.RegisterChannel(&dummyChannel{}, nil, refNames[0])
  552. channelz.AddTraceEvent(logger, ids[0], 0, &channelz.TraceEventDesc{
  553. Desc: "Channel Created",
  554. Severity: channelz.CtInfo,
  555. })
  556. var err error
  557. ids[1], err = channelz.RegisterSubChannel(&dummyChannel{}, ids[0], refNames[1])
  558. if err != nil {
  559. t.Fatalf("channelz.RegisterSubChannel() failed: %v", err)
  560. }
  561. channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
  562. Desc: subchanCreated,
  563. Severity: channelz.CtInfo,
  564. Parent: &channelz.TraceEventDesc{
  565. Desc: fmt.Sprintf("Nested Channel(id:%d) created", ids[0].Int()),
  566. Severity: channelz.CtInfo,
  567. },
  568. })
  569. ids[2], _ = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[2])
  570. ids[3], _ = channelz.RegisterNormalSocket(&dummySocket{}, ids[1], refNames[3])
  571. channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
  572. Desc: subchanConnectivityChange,
  573. Severity: channelz.CtInfo,
  574. })
  575. channelz.AddTraceEvent(logger, ids[1], 0, &channelz.TraceEventDesc{
  576. Desc: subChanPickNewAddress,
  577. Severity: channelz.CtInfo,
  578. })
  579. for _, id := range ids {
  580. defer channelz.RemoveEntry(id)
  581. }
  582. svr := newCZServer()
  583. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  584. defer cancel()
  585. resp, _ := svr.GetSubchannel(ctx, &channelzpb.GetSubchannelRequest{SubchannelId: ids[1].Int()})
  586. metrics := resp.GetSubchannel()
  587. want := map[int64]string{
  588. ids[2].Int(): refNames[2],
  589. ids[3].Int(): refNames[3],
  590. }
  591. if !cmp.Equal(convertSocketRefSliceToMap(metrics.GetSocketRef()), want) {
  592. t.Fatalf("metrics.GetSocketRef() want %#v: got: %#v", want, metrics.GetSocketRef())
  593. }
  594. trace := metrics.GetData().GetTrace()
  595. wantTrace := []struct {
  596. desc string
  597. severity channelzpb.ChannelTraceEvent_Severity
  598. childID int64
  599. childRef string
  600. }{
  601. {desc: subchanCreated, severity: channelzpb.ChannelTraceEvent_CT_INFO},
  602. {desc: subchanConnectivityChange, severity: channelzpb.ChannelTraceEvent_CT_INFO},
  603. {desc: subChanPickNewAddress, severity: channelzpb.ChannelTraceEvent_CT_INFO},
  604. }
  605. for i, e := range trace.Events {
  606. if e.GetDescription() != wantTrace[i].desc {
  607. t.Fatalf("trace: GetDescription want %#v, got %#v", wantTrace[i].desc, e.GetDescription())
  608. }
  609. if e.GetSeverity() != wantTrace[i].severity {
  610. t.Fatalf("trace: GetSeverity want %#v, got %#v", wantTrace[i].severity, e.GetSeverity())
  611. }
  612. if wantTrace[i].childID == 0 && (e.GetChannelRef() != nil || e.GetSubchannelRef() != nil) {
  613. t.Fatalf("trace: GetChannelRef() should return nil, as there is no reference")
  614. }
  615. if e.GetChannelRef().GetChannelId() != wantTrace[i].childID || e.GetChannelRef().GetName() != wantTrace[i].childRef {
  616. if e.GetSubchannelRef().GetSubchannelId() != wantTrace[i].childID || e.GetSubchannelRef().GetName() != wantTrace[i].childRef {
  617. t.Fatalf("trace: GetChannelRef/GetSubchannelRef want (child ID: %d, child name: %q), got %#v and %#v", wantTrace[i].childID, wantTrace[i].childRef, e.GetChannelRef(), e.GetSubchannelRef())
  618. }
  619. }
  620. }
  621. }
  622. func (s) TestGetSocket(t *testing.T) {
  623. czCleanup := channelz.NewChannelzStorageForTesting()
  624. defer cleanupWrapper(czCleanup, t)
  625. ss := []*dummySocket{
  626. {
  627. streamsStarted: 10,
  628. streamsSucceeded: 2,
  629. streamsFailed: 3,
  630. messagesSent: 20,
  631. messagesReceived: 10,
  632. keepAlivesSent: 2,
  633. lastLocalStreamCreatedTimestamp: time.Now().UTC(),
  634. lastRemoteStreamCreatedTimestamp: time.Now().UTC(),
  635. lastMessageSentTimestamp: time.Now().UTC(),
  636. lastMessageReceivedTimestamp: time.Now().UTC(),
  637. localFlowControlWindow: 65536,
  638. remoteFlowControlWindow: 1024,
  639. localAddr: &net.TCPAddr{IP: net.ParseIP("1.0.0.1"), Port: 10001},
  640. remoteAddr: &net.TCPAddr{IP: net.ParseIP("12.0.0.1"), Port: 10002},
  641. remoteName: "remote.remote",
  642. },
  643. {
  644. streamsStarted: 10,
  645. streamsSucceeded: 2,
  646. streamsFailed: 3,
  647. messagesSent: 20,
  648. messagesReceived: 10,
  649. keepAlivesSent: 2,
  650. lastRemoteStreamCreatedTimestamp: time.Now().UTC(),
  651. lastMessageSentTimestamp: time.Now().UTC(),
  652. lastMessageReceivedTimestamp: time.Now().UTC(),
  653. localFlowControlWindow: 65536,
  654. remoteFlowControlWindow: 1024,
  655. localAddr: &net.UnixAddr{Name: "file.path", Net: "unix"},
  656. remoteAddr: &net.UnixAddr{Name: "another.path", Net: "unix"},
  657. remoteName: "remote.remote",
  658. },
  659. {
  660. streamsStarted: 5,
  661. streamsSucceeded: 2,
  662. streamsFailed: 3,
  663. messagesSent: 20,
  664. messagesReceived: 10,
  665. keepAlivesSent: 2,
  666. lastLocalStreamCreatedTimestamp: time.Now().UTC(),
  667. lastMessageSentTimestamp: time.Now().UTC(),
  668. lastMessageReceivedTimestamp: time.Now().UTC(),
  669. localFlowControlWindow: 65536,
  670. remoteFlowControlWindow: 10240,
  671. localAddr: &net.IPAddr{IP: net.ParseIP("1.0.0.1")},
  672. remoteAddr: &net.IPAddr{IP: net.ParseIP("9.0.0.1")},
  673. remoteName: "",
  674. },
  675. {
  676. localAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 10001},
  677. },
  678. {
  679. security: &credentials.TLSChannelzSecurityValue{
  680. StandardName: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
  681. RemoteCertificate: []byte{48, 130, 2, 156, 48, 130, 2, 5, 160},
  682. },
  683. },
  684. {
  685. security: &credentials.OtherChannelzSecurityValue{
  686. Name: "XXXX",
  687. },
  688. },
  689. {
  690. security: &credentials.OtherChannelzSecurityValue{
  691. Name: "YYYY",
  692. Value: &OtherSecurityValue{LocalCertificate: []byte{1, 2, 3}, RemoteCertificate: []byte{4, 5, 6}},
  693. },
  694. },
  695. }
  696. svr := newCZServer()
  697. ids := make([]*channelz.Identifier, len(ss))
  698. svrID := channelz.RegisterServer(&dummyServer{}, "")
  699. defer channelz.RemoveEntry(svrID)
  700. for i, s := range ss {
  701. ids[i], _ = channelz.RegisterNormalSocket(s, svrID, strconv.Itoa(i))
  702. defer channelz.RemoveEntry(ids[i])
  703. }
  704. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  705. defer cancel()
  706. for i, s := range ss {
  707. resp, _ := svr.GetSocket(ctx, &channelzpb.GetSocketRequest{SocketId: ids[i].Int()})
  708. got, want := resp.GetSocket().GetRef(), &channelzpb.SocketRef{SocketId: ids[i].Int(), Name: strconv.Itoa(i)}
  709. if !cmp.Equal(got, want, protocmp.Transform()) {
  710. t.Fatalf("resp.GetSocket() returned metrics.GetRef() = %#v, want %#v", got, want)
  711. }
  712. socket, err := socketProtoToStruct(resp.GetSocket())
  713. if err != nil {
  714. t.Fatal(err)
  715. }
  716. if diff := cmp.Diff(s, socket, protocmp.Transform(), cmp.AllowUnexported(dummySocket{})); diff != "" {
  717. t.Fatalf("unexpected socket, diff (-want +got):\n%s", diff)
  718. }
  719. }
  720. }