funcs.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package channelz defines APIs for enabling channelz service, entry
  19. // registration/deletion, and accessing channelz data. It also defines channelz
  20. // metric struct formats.
  21. //
  22. // All APIs in this package are experimental.
  23. package channelz
  24. import (
  25. "context"
  26. "errors"
  27. "fmt"
  28. "sort"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "google.golang.org/grpc/grpclog"
  33. )
  34. const (
  35. defaultMaxTraceEntry int32 = 30
  36. )
  37. var (
  38. db dbWrapper
  39. idGen idGenerator
  40. // EntryPerPage defines the number of channelz entries to be shown on a web page.
  41. EntryPerPage = int64(50)
  42. curState int32
  43. maxTraceEntry = defaultMaxTraceEntry
  44. )
  45. // TurnOn turns on channelz data collection.
  46. func TurnOn() {
  47. if !IsOn() {
  48. db.set(newChannelMap())
  49. idGen.reset()
  50. atomic.StoreInt32(&curState, 1)
  51. }
  52. }
  53. // IsOn returns whether channelz data collection is on.
  54. func IsOn() bool {
  55. return atomic.CompareAndSwapInt32(&curState, 1, 1)
  56. }
  57. // SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
  58. // Setting it to 0 will disable channel tracing.
  59. func SetMaxTraceEntry(i int32) {
  60. atomic.StoreInt32(&maxTraceEntry, i)
  61. }
  62. // ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
  63. func ResetMaxTraceEntryToDefault() {
  64. atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
  65. }
  66. func getMaxTraceEntry() int {
  67. i := atomic.LoadInt32(&maxTraceEntry)
  68. return int(i)
  69. }
  70. // dbWarpper wraps around a reference to internal channelz data storage, and
  71. // provide synchronized functionality to set and get the reference.
  72. type dbWrapper struct {
  73. mu sync.RWMutex
  74. DB *channelMap
  75. }
  76. func (d *dbWrapper) set(db *channelMap) {
  77. d.mu.Lock()
  78. d.DB = db
  79. d.mu.Unlock()
  80. }
  81. func (d *dbWrapper) get() *channelMap {
  82. d.mu.RLock()
  83. defer d.mu.RUnlock()
  84. return d.DB
  85. }
  86. // NewChannelzStorageForTesting initializes channelz data storage and id
  87. // generator for testing purposes.
  88. //
  89. // Returns a cleanup function to be invoked by the test, which waits for up to
  90. // 10s for all channelz state to be reset by the grpc goroutines when those
  91. // entities get closed. This cleanup function helps with ensuring that tests
  92. // don't mess up each other.
  93. func NewChannelzStorageForTesting() (cleanup func() error) {
  94. db.set(newChannelMap())
  95. idGen.reset()
  96. return func() error {
  97. cm := db.get()
  98. if cm == nil {
  99. return nil
  100. }
  101. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  102. defer cancel()
  103. ticker := time.NewTicker(10 * time.Millisecond)
  104. defer ticker.Stop()
  105. for {
  106. cm.mu.RLock()
  107. topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets := len(cm.topLevelChannels), len(cm.servers), len(cm.channels), len(cm.subChannels), len(cm.listenSockets), len(cm.normalSockets)
  108. cm.mu.RUnlock()
  109. if err := ctx.Err(); err != nil {
  110. return fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", topLevelChannels, servers, channels, subChannels, listenSockets, normalSockets)
  111. }
  112. if topLevelChannels == 0 && servers == 0 && channels == 0 && subChannels == 0 && listenSockets == 0 && normalSockets == 0 {
  113. return nil
  114. }
  115. <-ticker.C
  116. }
  117. }
  118. }
  119. // GetTopChannels returns a slice of top channel's ChannelMetric, along with a
  120. // boolean indicating whether there's more top channels to be queried for.
  121. //
  122. // The arg id specifies that only top channel with id at or above it will be included
  123. // in the result. The returned slice is up to a length of the arg maxResults or
  124. // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
  125. func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
  126. return db.get().GetTopChannels(id, maxResults)
  127. }
  128. // GetServers returns a slice of server's ServerMetric, along with a
  129. // boolean indicating whether there's more servers to be queried for.
  130. //
  131. // The arg id specifies that only server with id at or above it will be included
  132. // in the result. The returned slice is up to a length of the arg maxResults or
  133. // EntryPerPage if maxResults is zero, and is sorted in ascending id order.
  134. func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
  135. return db.get().GetServers(id, maxResults)
  136. }
  137. // GetServerSockets returns a slice of server's (identified by id) normal socket's
  138. // SocketMetric, along with a boolean indicating whether there's more sockets to
  139. // be queried for.
  140. //
  141. // The arg startID specifies that only sockets with id at or above it will be
  142. // included in the result. The returned slice is up to a length of the arg maxResults
  143. // or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
  144. func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
  145. return db.get().GetServerSockets(id, startID, maxResults)
  146. }
  147. // GetChannel returns the ChannelMetric for the channel (identified by id).
  148. func GetChannel(id int64) *ChannelMetric {
  149. return db.get().GetChannel(id)
  150. }
  151. // GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
  152. func GetSubChannel(id int64) *SubChannelMetric {
  153. return db.get().GetSubChannel(id)
  154. }
  155. // GetSocket returns the SocketInternalMetric for the socket (identified by id).
  156. func GetSocket(id int64) *SocketMetric {
  157. return db.get().GetSocket(id)
  158. }
  159. // GetServer returns the ServerMetric for the server (identified by id).
  160. func GetServer(id int64) *ServerMetric {
  161. return db.get().GetServer(id)
  162. }
  163. // RegisterChannel registers the given channel c in the channelz database with
  164. // ref as its reference name, and adds it to the child list of its parent
  165. // (identified by pid). pid == nil means no parent.
  166. //
  167. // Returns a unique channelz identifier assigned to this channel.
  168. //
  169. // If channelz is not turned ON, the channelz database is not mutated.
  170. func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
  171. id := idGen.genID()
  172. var parent int64
  173. isTopChannel := true
  174. if pid != nil {
  175. isTopChannel = false
  176. parent = pid.Int()
  177. }
  178. if !IsOn() {
  179. return newIdentifer(RefChannel, id, pid)
  180. }
  181. cn := &channel{
  182. refName: ref,
  183. c: c,
  184. subChans: make(map[int64]string),
  185. nestedChans: make(map[int64]string),
  186. id: id,
  187. pid: parent,
  188. trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
  189. }
  190. db.get().addChannel(id, cn, isTopChannel, parent)
  191. return newIdentifer(RefChannel, id, pid)
  192. }
  193. // RegisterSubChannel registers the given subChannel c in the channelz database
  194. // with ref as its reference name, and adds it to the child list of its parent
  195. // (identified by pid).
  196. //
  197. // Returns a unique channelz identifier assigned to this subChannel.
  198. //
  199. // If channelz is not turned ON, the channelz database is not mutated.
  200. func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
  201. if pid == nil {
  202. return nil, errors.New("a SubChannel's parent id cannot be nil")
  203. }
  204. id := idGen.genID()
  205. if !IsOn() {
  206. return newIdentifer(RefSubChannel, id, pid), nil
  207. }
  208. sc := &subChannel{
  209. refName: ref,
  210. c: c,
  211. sockets: make(map[int64]string),
  212. id: id,
  213. pid: pid.Int(),
  214. trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
  215. }
  216. db.get().addSubChannel(id, sc, pid.Int())
  217. return newIdentifer(RefSubChannel, id, pid), nil
  218. }
  219. // RegisterServer registers the given server s in channelz database. It returns
  220. // the unique channelz tracking id assigned to this server.
  221. //
  222. // If channelz is not turned ON, the channelz database is not mutated.
  223. func RegisterServer(s Server, ref string) *Identifier {
  224. id := idGen.genID()
  225. if !IsOn() {
  226. return newIdentifer(RefServer, id, nil)
  227. }
  228. svr := &server{
  229. refName: ref,
  230. s: s,
  231. sockets: make(map[int64]string),
  232. listenSockets: make(map[int64]string),
  233. id: id,
  234. }
  235. db.get().addServer(id, svr)
  236. return newIdentifer(RefServer, id, nil)
  237. }
  238. // RegisterListenSocket registers the given listen socket s in channelz database
  239. // with ref as its reference name, and add it to the child list of its parent
  240. // (identified by pid). It returns the unique channelz tracking id assigned to
  241. // this listen socket.
  242. //
  243. // If channelz is not turned ON, the channelz database is not mutated.
  244. func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
  245. if pid == nil {
  246. return nil, errors.New("a ListenSocket's parent id cannot be 0")
  247. }
  248. id := idGen.genID()
  249. if !IsOn() {
  250. return newIdentifer(RefListenSocket, id, pid), nil
  251. }
  252. ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
  253. db.get().addListenSocket(id, ls, pid.Int())
  254. return newIdentifer(RefListenSocket, id, pid), nil
  255. }
  256. // RegisterNormalSocket registers the given normal socket s in channelz database
  257. // with ref as its reference name, and adds it to the child list of its parent
  258. // (identified by pid). It returns the unique channelz tracking id assigned to
  259. // this normal socket.
  260. //
  261. // If channelz is not turned ON, the channelz database is not mutated.
  262. func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
  263. if pid == nil {
  264. return nil, errors.New("a NormalSocket's parent id cannot be 0")
  265. }
  266. id := idGen.genID()
  267. if !IsOn() {
  268. return newIdentifer(RefNormalSocket, id, pid), nil
  269. }
  270. ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
  271. db.get().addNormalSocket(id, ns, pid.Int())
  272. return newIdentifer(RefNormalSocket, id, pid), nil
  273. }
  274. // RemoveEntry removes an entry with unique channelz tracking id to be id from
  275. // channelz database.
  276. //
  277. // If channelz is not turned ON, this function is a no-op.
  278. func RemoveEntry(id *Identifier) {
  279. if !IsOn() {
  280. return
  281. }
  282. db.get().removeEntry(id.Int())
  283. }
  284. // TraceEventDesc is what the caller of AddTraceEvent should provide to describe
  285. // the event to be added to the channel trace.
  286. //
  287. // The Parent field is optional. It is used for an event that will be recorded
  288. // in the entity's parent trace.
  289. type TraceEventDesc struct {
  290. Desc string
  291. Severity Severity
  292. Parent *TraceEventDesc
  293. }
  294. // AddTraceEvent adds trace related to the entity with specified id, using the
  295. // provided TraceEventDesc.
  296. //
  297. // If channelz is not turned ON, this will simply log the event descriptions.
  298. func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
  299. // Log only the trace description associated with the bottom most entity.
  300. switch desc.Severity {
  301. case CtUnknown, CtInfo:
  302. l.InfoDepth(depth+1, withParens(id)+desc.Desc)
  303. case CtWarning:
  304. l.WarningDepth(depth+1, withParens(id)+desc.Desc)
  305. case CtError:
  306. l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
  307. }
  308. if getMaxTraceEntry() == 0 {
  309. return
  310. }
  311. if IsOn() {
  312. db.get().traceEvent(id.Int(), desc)
  313. }
  314. }
  315. // channelMap is the storage data structure for channelz.
  316. // Methods of channelMap can be divided in two two categories with respect to locking.
  317. // 1. Methods acquire the global lock.
  318. // 2. Methods that can only be called when global lock is held.
  319. // A second type of method need always to be called inside a first type of method.
  320. type channelMap struct {
  321. mu sync.RWMutex
  322. topLevelChannels map[int64]struct{}
  323. servers map[int64]*server
  324. channels map[int64]*channel
  325. subChannels map[int64]*subChannel
  326. listenSockets map[int64]*listenSocket
  327. normalSockets map[int64]*normalSocket
  328. }
  329. func newChannelMap() *channelMap {
  330. return &channelMap{
  331. topLevelChannels: make(map[int64]struct{}),
  332. channels: make(map[int64]*channel),
  333. listenSockets: make(map[int64]*listenSocket),
  334. normalSockets: make(map[int64]*normalSocket),
  335. servers: make(map[int64]*server),
  336. subChannels: make(map[int64]*subChannel),
  337. }
  338. }
  339. func (c *channelMap) addServer(id int64, s *server) {
  340. c.mu.Lock()
  341. s.cm = c
  342. c.servers[id] = s
  343. c.mu.Unlock()
  344. }
  345. func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
  346. c.mu.Lock()
  347. cn.cm = c
  348. cn.trace.cm = c
  349. c.channels[id] = cn
  350. if isTopChannel {
  351. c.topLevelChannels[id] = struct{}{}
  352. } else {
  353. c.findEntry(pid).addChild(id, cn)
  354. }
  355. c.mu.Unlock()
  356. }
  357. func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
  358. c.mu.Lock()
  359. sc.cm = c
  360. sc.trace.cm = c
  361. c.subChannels[id] = sc
  362. c.findEntry(pid).addChild(id, sc)
  363. c.mu.Unlock()
  364. }
  365. func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
  366. c.mu.Lock()
  367. ls.cm = c
  368. c.listenSockets[id] = ls
  369. c.findEntry(pid).addChild(id, ls)
  370. c.mu.Unlock()
  371. }
  372. func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
  373. c.mu.Lock()
  374. ns.cm = c
  375. c.normalSockets[id] = ns
  376. c.findEntry(pid).addChild(id, ns)
  377. c.mu.Unlock()
  378. }
  379. // removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
  380. // wait on the deletion of its children and until no other entity's channel trace references it.
  381. // It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
  382. // shutting down server will lead to the server being also deleted.
  383. func (c *channelMap) removeEntry(id int64) {
  384. c.mu.Lock()
  385. c.findEntry(id).triggerDelete()
  386. c.mu.Unlock()
  387. }
  388. // c.mu must be held by the caller
  389. func (c *channelMap) decrTraceRefCount(id int64) {
  390. e := c.findEntry(id)
  391. if v, ok := e.(tracedChannel); ok {
  392. v.decrTraceRefCount()
  393. e.deleteSelfIfReady()
  394. }
  395. }
  396. // c.mu must be held by the caller.
  397. func (c *channelMap) findEntry(id int64) entry {
  398. var v entry
  399. var ok bool
  400. if v, ok = c.channels[id]; ok {
  401. return v
  402. }
  403. if v, ok = c.subChannels[id]; ok {
  404. return v
  405. }
  406. if v, ok = c.servers[id]; ok {
  407. return v
  408. }
  409. if v, ok = c.listenSockets[id]; ok {
  410. return v
  411. }
  412. if v, ok = c.normalSockets[id]; ok {
  413. return v
  414. }
  415. return &dummyEntry{idNotFound: id}
  416. }
  417. // c.mu must be held by the caller
  418. // deleteEntry simply deletes an entry from the channelMap. Before calling this
  419. // method, caller must check this entry is ready to be deleted, i.e removeEntry()
  420. // has been called on it, and no children still exist.
  421. // Conditionals are ordered by the expected frequency of deletion of each entity
  422. // type, in order to optimize performance.
  423. func (c *channelMap) deleteEntry(id int64) {
  424. var ok bool
  425. if _, ok = c.normalSockets[id]; ok {
  426. delete(c.normalSockets, id)
  427. return
  428. }
  429. if _, ok = c.subChannels[id]; ok {
  430. delete(c.subChannels, id)
  431. return
  432. }
  433. if _, ok = c.channels[id]; ok {
  434. delete(c.channels, id)
  435. delete(c.topLevelChannels, id)
  436. return
  437. }
  438. if _, ok = c.listenSockets[id]; ok {
  439. delete(c.listenSockets, id)
  440. return
  441. }
  442. if _, ok = c.servers[id]; ok {
  443. delete(c.servers, id)
  444. return
  445. }
  446. }
  447. func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
  448. c.mu.Lock()
  449. child := c.findEntry(id)
  450. childTC, ok := child.(tracedChannel)
  451. if !ok {
  452. c.mu.Unlock()
  453. return
  454. }
  455. childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
  456. if desc.Parent != nil {
  457. parent := c.findEntry(child.getParentID())
  458. var chanType RefChannelType
  459. switch child.(type) {
  460. case *channel:
  461. chanType = RefChannel
  462. case *subChannel:
  463. chanType = RefSubChannel
  464. }
  465. if parentTC, ok := parent.(tracedChannel); ok {
  466. parentTC.getChannelTrace().append(&TraceEvent{
  467. Desc: desc.Parent.Desc,
  468. Severity: desc.Parent.Severity,
  469. Timestamp: time.Now(),
  470. RefID: id,
  471. RefName: childTC.getRefName(),
  472. RefType: chanType,
  473. })
  474. childTC.incrTraceRefCount()
  475. }
  476. }
  477. c.mu.Unlock()
  478. }
  479. type int64Slice []int64
  480. func (s int64Slice) Len() int { return len(s) }
  481. func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  482. func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
  483. func copyMap(m map[int64]string) map[int64]string {
  484. n := make(map[int64]string)
  485. for k, v := range m {
  486. n[k] = v
  487. }
  488. return n
  489. }
  490. func min(a, b int64) int64 {
  491. if a < b {
  492. return a
  493. }
  494. return b
  495. }
  496. func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
  497. if maxResults <= 0 {
  498. maxResults = EntryPerPage
  499. }
  500. c.mu.RLock()
  501. l := int64(len(c.topLevelChannels))
  502. ids := make([]int64, 0, l)
  503. cns := make([]*channel, 0, min(l, maxResults))
  504. for k := range c.topLevelChannels {
  505. ids = append(ids, k)
  506. }
  507. sort.Sort(int64Slice(ids))
  508. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  509. count := int64(0)
  510. var end bool
  511. var t []*ChannelMetric
  512. for i, v := range ids[idx:] {
  513. if count == maxResults {
  514. break
  515. }
  516. if cn, ok := c.channels[v]; ok {
  517. cns = append(cns, cn)
  518. t = append(t, &ChannelMetric{
  519. NestedChans: copyMap(cn.nestedChans),
  520. SubChans: copyMap(cn.subChans),
  521. })
  522. count++
  523. }
  524. if i == len(ids[idx:])-1 {
  525. end = true
  526. break
  527. }
  528. }
  529. c.mu.RUnlock()
  530. if count == 0 {
  531. end = true
  532. }
  533. for i, cn := range cns {
  534. t[i].ChannelData = cn.c.ChannelzMetric()
  535. t[i].ID = cn.id
  536. t[i].RefName = cn.refName
  537. t[i].Trace = cn.trace.dumpData()
  538. }
  539. return t, end
  540. }
  541. func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
  542. if maxResults <= 0 {
  543. maxResults = EntryPerPage
  544. }
  545. c.mu.RLock()
  546. l := int64(len(c.servers))
  547. ids := make([]int64, 0, l)
  548. ss := make([]*server, 0, min(l, maxResults))
  549. for k := range c.servers {
  550. ids = append(ids, k)
  551. }
  552. sort.Sort(int64Slice(ids))
  553. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
  554. count := int64(0)
  555. var end bool
  556. var s []*ServerMetric
  557. for i, v := range ids[idx:] {
  558. if count == maxResults {
  559. break
  560. }
  561. if svr, ok := c.servers[v]; ok {
  562. ss = append(ss, svr)
  563. s = append(s, &ServerMetric{
  564. ListenSockets: copyMap(svr.listenSockets),
  565. })
  566. count++
  567. }
  568. if i == len(ids[idx:])-1 {
  569. end = true
  570. break
  571. }
  572. }
  573. c.mu.RUnlock()
  574. if count == 0 {
  575. end = true
  576. }
  577. for i, svr := range ss {
  578. s[i].ServerData = svr.s.ChannelzMetric()
  579. s[i].ID = svr.id
  580. s[i].RefName = svr.refName
  581. }
  582. return s, end
  583. }
  584. func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
  585. if maxResults <= 0 {
  586. maxResults = EntryPerPage
  587. }
  588. var svr *server
  589. var ok bool
  590. c.mu.RLock()
  591. if svr, ok = c.servers[id]; !ok {
  592. // server with id doesn't exist.
  593. c.mu.RUnlock()
  594. return nil, true
  595. }
  596. svrskts := svr.sockets
  597. l := int64(len(svrskts))
  598. ids := make([]int64, 0, l)
  599. sks := make([]*normalSocket, 0, min(l, maxResults))
  600. for k := range svrskts {
  601. ids = append(ids, k)
  602. }
  603. sort.Sort(int64Slice(ids))
  604. idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
  605. count := int64(0)
  606. var end bool
  607. for i, v := range ids[idx:] {
  608. if count == maxResults {
  609. break
  610. }
  611. if ns, ok := c.normalSockets[v]; ok {
  612. sks = append(sks, ns)
  613. count++
  614. }
  615. if i == len(ids[idx:])-1 {
  616. end = true
  617. break
  618. }
  619. }
  620. c.mu.RUnlock()
  621. if count == 0 {
  622. end = true
  623. }
  624. s := make([]*SocketMetric, 0, len(sks))
  625. for _, ns := range sks {
  626. sm := &SocketMetric{}
  627. sm.SocketData = ns.s.ChannelzMetric()
  628. sm.ID = ns.id
  629. sm.RefName = ns.refName
  630. s = append(s, sm)
  631. }
  632. return s, end
  633. }
  634. func (c *channelMap) GetChannel(id int64) *ChannelMetric {
  635. cm := &ChannelMetric{}
  636. var cn *channel
  637. var ok bool
  638. c.mu.RLock()
  639. if cn, ok = c.channels[id]; !ok {
  640. // channel with id doesn't exist.
  641. c.mu.RUnlock()
  642. return nil
  643. }
  644. cm.NestedChans = copyMap(cn.nestedChans)
  645. cm.SubChans = copyMap(cn.subChans)
  646. // cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
  647. // holding the lock to prevent potential data race.
  648. chanCopy := cn.c
  649. c.mu.RUnlock()
  650. cm.ChannelData = chanCopy.ChannelzMetric()
  651. cm.ID = cn.id
  652. cm.RefName = cn.refName
  653. cm.Trace = cn.trace.dumpData()
  654. return cm
  655. }
  656. func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
  657. cm := &SubChannelMetric{}
  658. var sc *subChannel
  659. var ok bool
  660. c.mu.RLock()
  661. if sc, ok = c.subChannels[id]; !ok {
  662. // subchannel with id doesn't exist.
  663. c.mu.RUnlock()
  664. return nil
  665. }
  666. cm.Sockets = copyMap(sc.sockets)
  667. // sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
  668. // holding the lock to prevent potential data race.
  669. chanCopy := sc.c
  670. c.mu.RUnlock()
  671. cm.ChannelData = chanCopy.ChannelzMetric()
  672. cm.ID = sc.id
  673. cm.RefName = sc.refName
  674. cm.Trace = sc.trace.dumpData()
  675. return cm
  676. }
  677. func (c *channelMap) GetSocket(id int64) *SocketMetric {
  678. sm := &SocketMetric{}
  679. c.mu.RLock()
  680. if ls, ok := c.listenSockets[id]; ok {
  681. c.mu.RUnlock()
  682. sm.SocketData = ls.s.ChannelzMetric()
  683. sm.ID = ls.id
  684. sm.RefName = ls.refName
  685. return sm
  686. }
  687. if ns, ok := c.normalSockets[id]; ok {
  688. c.mu.RUnlock()
  689. sm.SocketData = ns.s.ChannelzMetric()
  690. sm.ID = ns.id
  691. sm.RefName = ns.refName
  692. return sm
  693. }
  694. c.mu.RUnlock()
  695. return nil
  696. }
  697. func (c *channelMap) GetServer(id int64) *ServerMetric {
  698. sm := &ServerMetric{}
  699. var svr *server
  700. var ok bool
  701. c.mu.RLock()
  702. if svr, ok = c.servers[id]; !ok {
  703. c.mu.RUnlock()
  704. return nil
  705. }
  706. sm.ListenSockets = copyMap(svr.listenSockets)
  707. c.mu.RUnlock()
  708. sm.ID = svr.id
  709. sm.RefName = svr.refName
  710. sm.ServerData = svr.s.ChannelzMetric()
  711. return sm
  712. }
  713. type idGenerator struct {
  714. id int64
  715. }
  716. func (i *idGenerator) reset() {
  717. atomic.StoreInt64(&i.id, 0)
  718. }
  719. func (i *idGenerator) genID() int64 {
  720. return atomic.AddInt64(&i.id, 1)
  721. }