service.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package service provides an implementation for channelz service server.
  19. package service
  20. import (
  21. "context"
  22. "net"
  23. "github.com/golang/protobuf/ptypes"
  24. wrpb "github.com/golang/protobuf/ptypes/wrappers"
  25. channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1"
  26. channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/codes"
  29. "google.golang.org/grpc/connectivity"
  30. "google.golang.org/grpc/credentials"
  31. "google.golang.org/grpc/grpclog"
  32. "google.golang.org/grpc/internal/channelz"
  33. "google.golang.org/grpc/status"
  34. "google.golang.org/protobuf/protoadapt"
  35. "google.golang.org/protobuf/types/known/anypb"
  36. )
  37. func init() {
  38. channelz.TurnOn()
  39. }
  40. var logger = grpclog.Component("channelz")
  41. // RegisterChannelzServiceToServer registers the channelz service to the given server.
  42. //
  43. // Note: it is preferred to use the admin API
  44. // (https://pkg.go.dev/google.golang.org/grpc/admin#Register) instead to
  45. // register Channelz and other administrative services.
  46. func RegisterChannelzServiceToServer(s grpc.ServiceRegistrar) {
  47. channelzgrpc.RegisterChannelzServer(s, newCZServer())
  48. }
  49. func newCZServer() channelzgrpc.ChannelzServer {
  50. return &serverImpl{}
  51. }
  52. type serverImpl struct {
  53. channelzgrpc.UnimplementedChannelzServer
  54. }
  55. func connectivityStateToProto(s connectivity.State) *channelzpb.ChannelConnectivityState {
  56. switch s {
  57. case connectivity.Idle:
  58. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
  59. case connectivity.Connecting:
  60. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
  61. case connectivity.Ready:
  62. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
  63. case connectivity.TransientFailure:
  64. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
  65. case connectivity.Shutdown:
  66. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
  67. default:
  68. return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
  69. }
  70. }
  71. func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
  72. pbt := &channelzpb.ChannelTrace{}
  73. pbt.NumEventsLogged = ct.EventNum
  74. if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
  75. pbt.CreationTimestamp = ts
  76. }
  77. events := make([]*channelzpb.ChannelTraceEvent, 0, len(ct.Events))
  78. for _, e := range ct.Events {
  79. cte := &channelzpb.ChannelTraceEvent{
  80. Description: e.Desc,
  81. Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity),
  82. }
  83. if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
  84. cte.Timestamp = ts
  85. }
  86. if e.RefID != 0 {
  87. switch e.RefType {
  88. case channelz.RefChannel:
  89. cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
  90. case channelz.RefSubChannel:
  91. cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
  92. }
  93. }
  94. events = append(events, cte)
  95. }
  96. pbt.Events = events
  97. return pbt
  98. }
  99. func channelMetricToProto(cm *channelz.ChannelMetric) *channelzpb.Channel {
  100. c := &channelzpb.Channel{}
  101. c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}
  102. c.Data = &channelzpb.ChannelData{
  103. State: connectivityStateToProto(cm.ChannelData.State),
  104. Target: cm.ChannelData.Target,
  105. CallsStarted: cm.ChannelData.CallsStarted,
  106. CallsSucceeded: cm.ChannelData.CallsSucceeded,
  107. CallsFailed: cm.ChannelData.CallsFailed,
  108. }
  109. if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
  110. c.Data.LastCallStartedTimestamp = ts
  111. }
  112. nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
  113. for id, ref := range cm.NestedChans {
  114. nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
  115. }
  116. c.ChannelRef = nestedChans
  117. subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
  118. for id, ref := range cm.SubChans {
  119. subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
  120. }
  121. c.SubchannelRef = subChans
  122. sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
  123. for id, ref := range cm.Sockets {
  124. sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
  125. }
  126. c.SocketRef = sockets
  127. c.Data.Trace = channelTraceToProto(cm.Trace)
  128. return c
  129. }
  130. func subChannelMetricToProto(cm *channelz.SubChannelMetric) *channelzpb.Subchannel {
  131. sc := &channelzpb.Subchannel{}
  132. sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName}
  133. sc.Data = &channelzpb.ChannelData{
  134. State: connectivityStateToProto(cm.ChannelData.State),
  135. Target: cm.ChannelData.Target,
  136. CallsStarted: cm.ChannelData.CallsStarted,
  137. CallsSucceeded: cm.ChannelData.CallsSucceeded,
  138. CallsFailed: cm.ChannelData.CallsFailed,
  139. }
  140. if ts, err := ptypes.TimestampProto(cm.ChannelData.LastCallStartedTimestamp); err == nil {
  141. sc.Data.LastCallStartedTimestamp = ts
  142. }
  143. nestedChans := make([]*channelzpb.ChannelRef, 0, len(cm.NestedChans))
  144. for id, ref := range cm.NestedChans {
  145. nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
  146. }
  147. sc.ChannelRef = nestedChans
  148. subChans := make([]*channelzpb.SubchannelRef, 0, len(cm.SubChans))
  149. for id, ref := range cm.SubChans {
  150. subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
  151. }
  152. sc.SubchannelRef = subChans
  153. sockets := make([]*channelzpb.SocketRef, 0, len(cm.Sockets))
  154. for id, ref := range cm.Sockets {
  155. sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
  156. }
  157. sc.SocketRef = sockets
  158. sc.Data.Trace = channelTraceToProto(cm.Trace)
  159. return sc
  160. }
  161. func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
  162. switch v := se.(type) {
  163. case *credentials.TLSChannelzSecurityValue:
  164. return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
  165. CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
  166. LocalCertificate: v.LocalCertificate,
  167. RemoteCertificate: v.RemoteCertificate,
  168. }}}
  169. case *credentials.OtherChannelzSecurityValue:
  170. otherSecurity := &channelzpb.Security_OtherSecurity{
  171. Name: v.Name,
  172. }
  173. if anyval, err := anypb.New(protoadapt.MessageV2Of(v.Value)); err == nil {
  174. otherSecurity.Value = anyval
  175. }
  176. return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
  177. }
  178. return nil
  179. }
  180. func addrToProto(a net.Addr) *channelzpb.Address {
  181. switch a.Network() {
  182. case "udp":
  183. // TODO: Address_OtherAddress{}. Need proto def for Value.
  184. case "ip":
  185. // Note zone info is discarded through the conversion.
  186. return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
  187. case "ip+net":
  188. // Note mask info is discarded through the conversion.
  189. return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
  190. case "tcp":
  191. // Note zone info is discarded through the conversion.
  192. return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
  193. case "unix", "unixgram", "unixpacket":
  194. return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
  195. default:
  196. }
  197. return &channelzpb.Address{}
  198. }
  199. func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
  200. s := &channelzpb.Socket{}
  201. s.Ref = &channelzpb.SocketRef{SocketId: sm.ID, Name: sm.RefName}
  202. s.Data = &channelzpb.SocketData{
  203. StreamsStarted: sm.SocketData.StreamsStarted,
  204. StreamsSucceeded: sm.SocketData.StreamsSucceeded,
  205. StreamsFailed: sm.SocketData.StreamsFailed,
  206. MessagesSent: sm.SocketData.MessagesSent,
  207. MessagesReceived: sm.SocketData.MessagesReceived,
  208. KeepAlivesSent: sm.SocketData.KeepAlivesSent,
  209. }
  210. if ts, err := ptypes.TimestampProto(sm.SocketData.LastLocalStreamCreatedTimestamp); err == nil {
  211. s.Data.LastLocalStreamCreatedTimestamp = ts
  212. }
  213. if ts, err := ptypes.TimestampProto(sm.SocketData.LastRemoteStreamCreatedTimestamp); err == nil {
  214. s.Data.LastRemoteStreamCreatedTimestamp = ts
  215. }
  216. if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageSentTimestamp); err == nil {
  217. s.Data.LastMessageSentTimestamp = ts
  218. }
  219. if ts, err := ptypes.TimestampProto(sm.SocketData.LastMessageReceivedTimestamp); err == nil {
  220. s.Data.LastMessageReceivedTimestamp = ts
  221. }
  222. s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.LocalFlowControlWindow}
  223. s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: sm.SocketData.RemoteFlowControlWindow}
  224. if sm.SocketData.SocketOptions != nil {
  225. s.Data.Option = sockoptToProto(sm.SocketData.SocketOptions)
  226. }
  227. if sm.SocketData.Security != nil {
  228. s.Security = securityToProto(sm.SocketData.Security)
  229. }
  230. if sm.SocketData.LocalAddr != nil {
  231. s.Local = addrToProto(sm.SocketData.LocalAddr)
  232. }
  233. if sm.SocketData.RemoteAddr != nil {
  234. s.Remote = addrToProto(sm.SocketData.RemoteAddr)
  235. }
  236. s.RemoteName = sm.SocketData.RemoteName
  237. return s
  238. }
  239. func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
  240. metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults())
  241. resp := &channelzpb.GetTopChannelsResponse{}
  242. for _, m := range metrics {
  243. resp.Channel = append(resp.Channel, channelMetricToProto(m))
  244. }
  245. resp.End = end
  246. return resp, nil
  247. }
  248. func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
  249. s := &channelzpb.Server{}
  250. s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}
  251. s.Data = &channelzpb.ServerData{
  252. CallsStarted: sm.ServerData.CallsStarted,
  253. CallsSucceeded: sm.ServerData.CallsSucceeded,
  254. CallsFailed: sm.ServerData.CallsFailed,
  255. }
  256. if ts, err := ptypes.TimestampProto(sm.ServerData.LastCallStartedTimestamp); err == nil {
  257. s.Data.LastCallStartedTimestamp = ts
  258. }
  259. sockets := make([]*channelzpb.SocketRef, 0, len(sm.ListenSockets))
  260. for id, ref := range sm.ListenSockets {
  261. sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
  262. }
  263. s.ListenSocket = sockets
  264. return s
  265. }
  266. func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
  267. metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults())
  268. resp := &channelzpb.GetServersResponse{}
  269. for _, m := range metrics {
  270. resp.Server = append(resp.Server, serverMetricToProto(m))
  271. }
  272. resp.End = end
  273. return resp, nil
  274. }
  275. func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
  276. metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults())
  277. resp := &channelzpb.GetServerSocketsResponse{}
  278. for _, m := range metrics {
  279. resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
  280. }
  281. resp.End = end
  282. return resp, nil
  283. }
  284. func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) {
  285. var metric *channelz.ChannelMetric
  286. if metric = channelz.GetChannel(req.GetChannelId()); metric == nil {
  287. return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId())
  288. }
  289. resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(metric)}
  290. return resp, nil
  291. }
  292. func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) {
  293. var metric *channelz.SubChannelMetric
  294. if metric = channelz.GetSubChannel(req.GetSubchannelId()); metric == nil {
  295. return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId())
  296. }
  297. resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(metric)}
  298. return resp, nil
  299. }
  300. func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) {
  301. var metric *channelz.SocketMetric
  302. if metric = channelz.GetSocket(req.GetSocketId()); metric == nil {
  303. return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId())
  304. }
  305. resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)}
  306. return resp, nil
  307. }
  308. func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) {
  309. var metric *channelz.ServerMetric
  310. if metric = channelz.GetServer(req.GetServerId()); metric == nil {
  311. return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId())
  312. }
  313. resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)}
  314. return resp, nil
  315. }