server.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package health provides a service that exposes server's health and it must be
  19. // imported to enable support for client-side health checks.
  20. package health
  21. import (
  22. "context"
  23. "sync"
  24. "google.golang.org/grpc/codes"
  25. healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
  26. healthpb "google.golang.org/grpc/health/grpc_health_v1"
  27. "google.golang.org/grpc/status"
  28. )
  29. // Server implements `service Health`.
  30. type Server struct {
  31. healthgrpc.UnimplementedHealthServer
  32. mu sync.RWMutex
  33. // If shutdown is true, it's expected all serving status is NOT_SERVING, and
  34. // will stay in NOT_SERVING.
  35. shutdown bool
  36. // statusMap stores the serving status of the services this Server monitors.
  37. statusMap map[string]healthpb.HealthCheckResponse_ServingStatus
  38. updates map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus
  39. }
  40. // NewServer returns a new Server.
  41. func NewServer() *Server {
  42. return &Server{
  43. statusMap: map[string]healthpb.HealthCheckResponse_ServingStatus{"": healthpb.HealthCheckResponse_SERVING},
  44. updates: make(map[string]map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus),
  45. }
  46. }
  47. // Check implements `service Health`.
  48. func (s *Server) Check(ctx context.Context, in *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) {
  49. s.mu.RLock()
  50. defer s.mu.RUnlock()
  51. if servingStatus, ok := s.statusMap[in.Service]; ok {
  52. return &healthpb.HealthCheckResponse{
  53. Status: servingStatus,
  54. }, nil
  55. }
  56. return nil, status.Error(codes.NotFound, "unknown service")
  57. }
  58. // Watch implements `service Health`.
  59. func (s *Server) Watch(in *healthpb.HealthCheckRequest, stream healthgrpc.Health_WatchServer) error {
  60. service := in.Service
  61. // update channel is used for getting service status updates.
  62. update := make(chan healthpb.HealthCheckResponse_ServingStatus, 1)
  63. s.mu.Lock()
  64. // Puts the initial status to the channel.
  65. if servingStatus, ok := s.statusMap[service]; ok {
  66. update <- servingStatus
  67. } else {
  68. update <- healthpb.HealthCheckResponse_SERVICE_UNKNOWN
  69. }
  70. // Registers the update channel to the correct place in the updates map.
  71. if _, ok := s.updates[service]; !ok {
  72. s.updates[service] = make(map[healthgrpc.Health_WatchServer]chan healthpb.HealthCheckResponse_ServingStatus)
  73. }
  74. s.updates[service][stream] = update
  75. defer func() {
  76. s.mu.Lock()
  77. delete(s.updates[service], stream)
  78. s.mu.Unlock()
  79. }()
  80. s.mu.Unlock()
  81. var lastSentStatus healthpb.HealthCheckResponse_ServingStatus = -1
  82. for {
  83. select {
  84. // Status updated. Sends the up-to-date status to the client.
  85. case servingStatus := <-update:
  86. if lastSentStatus == servingStatus {
  87. continue
  88. }
  89. lastSentStatus = servingStatus
  90. err := stream.Send(&healthpb.HealthCheckResponse{Status: servingStatus})
  91. if err != nil {
  92. return status.Error(codes.Canceled, "Stream has ended.")
  93. }
  94. // Context done. Removes the update channel from the updates map.
  95. case <-stream.Context().Done():
  96. return status.Error(codes.Canceled, "Stream has ended.")
  97. }
  98. }
  99. }
  100. // SetServingStatus is called when need to reset the serving status of a service
  101. // or insert a new service entry into the statusMap.
  102. func (s *Server) SetServingStatus(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
  103. s.mu.Lock()
  104. defer s.mu.Unlock()
  105. if s.shutdown {
  106. logger.Infof("health: status changing for %s to %v is ignored because health service is shutdown", service, servingStatus)
  107. return
  108. }
  109. s.setServingStatusLocked(service, servingStatus)
  110. }
  111. func (s *Server) setServingStatusLocked(service string, servingStatus healthpb.HealthCheckResponse_ServingStatus) {
  112. s.statusMap[service] = servingStatus
  113. for _, update := range s.updates[service] {
  114. // Clears previous updates, that are not sent to the client, from the channel.
  115. // This can happen if the client is not reading and the server gets flow control limited.
  116. select {
  117. case <-update:
  118. default:
  119. }
  120. // Puts the most recent update to the channel.
  121. update <- servingStatus
  122. }
  123. }
  124. // Shutdown sets all serving status to NOT_SERVING, and configures the server to
  125. // ignore all future status changes.
  126. //
  127. // This changes serving status for all services. To set status for a particular
  128. // services, call SetServingStatus().
  129. func (s *Server) Shutdown() {
  130. s.mu.Lock()
  131. defer s.mu.Unlock()
  132. s.shutdown = true
  133. for service := range s.statusMap {
  134. s.setServingStatusLocked(service, healthpb.HealthCheckResponse_NOT_SERVING)
  135. }
  136. }
  137. // Resume sets all serving status to SERVING, and configures the server to
  138. // accept all future status changes.
  139. //
  140. // This changes serving status for all services. To set status for a particular
  141. // services, call SetServingStatus().
  142. func (s *Server) Resume() {
  143. s.mu.Lock()
  144. defer s.mu.Unlock()
  145. s.shutdown = false
  146. for service := range s.statusMap {
  147. s.setServingStatusLocked(service, healthpb.HealthCheckResponse_SERVING)
  148. }
  149. }