sink.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "bufio"
  21. "encoding/binary"
  22. "io"
  23. "sync"
  24. "time"
  25. "github.com/golang/protobuf/proto"
  26. binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  27. )
  28. var (
  29. // DefaultSink is the sink where the logs will be written to. It's exported
  30. // for the binarylog package to update.
  31. DefaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
  32. )
  33. // Sink writes log entry into the binary log sink.
  34. //
  35. // sink is a copy of the exported binarylog.Sink, to avoid circular dependency.
  36. type Sink interface {
  37. // Write will be called to write the log entry into the sink.
  38. //
  39. // It should be thread-safe so it can be called in parallel.
  40. Write(*binlogpb.GrpcLogEntry) error
  41. // Close will be called when the Sink is replaced by a new Sink.
  42. Close() error
  43. }
  44. type noopSink struct{}
  45. func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
  46. func (ns *noopSink) Close() error { return nil }
  47. // newWriterSink creates a binary log sink with the given writer.
  48. //
  49. // Write() marshals the proto message and writes it to the given writer. Each
  50. // message is prefixed with a 4 byte big endian unsigned integer as the length.
  51. //
  52. // No buffer is done, Close() doesn't try to close the writer.
  53. func newWriterSink(w io.Writer) Sink {
  54. return &writerSink{out: w}
  55. }
  56. type writerSink struct {
  57. out io.Writer
  58. }
  59. func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
  60. b, err := proto.Marshal(e)
  61. if err != nil {
  62. grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
  63. return err
  64. }
  65. hdr := make([]byte, 4)
  66. binary.BigEndian.PutUint32(hdr, uint32(len(b)))
  67. if _, err := ws.out.Write(hdr); err != nil {
  68. return err
  69. }
  70. if _, err := ws.out.Write(b); err != nil {
  71. return err
  72. }
  73. return nil
  74. }
  75. func (ws *writerSink) Close() error { return nil }
  76. type bufferedSink struct {
  77. mu sync.Mutex
  78. closer io.Closer
  79. out Sink // out is built on buf.
  80. buf *bufio.Writer // buf is kept for flush.
  81. flusherStarted bool
  82. writeTicker *time.Ticker
  83. done chan struct{}
  84. }
  85. func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
  86. fs.mu.Lock()
  87. defer fs.mu.Unlock()
  88. if !fs.flusherStarted {
  89. // Start the write loop when Write is called.
  90. fs.startFlushGoroutine()
  91. fs.flusherStarted = true
  92. }
  93. if err := fs.out.Write(e); err != nil {
  94. return err
  95. }
  96. return nil
  97. }
  98. const (
  99. bufFlushDuration = 60 * time.Second
  100. )
  101. func (fs *bufferedSink) startFlushGoroutine() {
  102. fs.writeTicker = time.NewTicker(bufFlushDuration)
  103. go func() {
  104. for {
  105. select {
  106. case <-fs.done:
  107. return
  108. case <-fs.writeTicker.C:
  109. }
  110. fs.mu.Lock()
  111. if err := fs.buf.Flush(); err != nil {
  112. grpclogLogger.Warningf("failed to flush to Sink: %v", err)
  113. }
  114. fs.mu.Unlock()
  115. }
  116. }()
  117. }
  118. func (fs *bufferedSink) Close() error {
  119. fs.mu.Lock()
  120. defer fs.mu.Unlock()
  121. if fs.writeTicker != nil {
  122. fs.writeTicker.Stop()
  123. }
  124. close(fs.done)
  125. if err := fs.buf.Flush(); err != nil {
  126. grpclogLogger.Warningf("failed to flush to Sink: %v", err)
  127. }
  128. if err := fs.closer.Close(); err != nil {
  129. grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
  130. }
  131. if err := fs.out.Close(); err != nil {
  132. grpclogLogger.Warningf("failed to close the Sink: %v", err)
  133. }
  134. return nil
  135. }
  136. // NewBufferedSink creates a binary log sink with the given WriteCloser.
  137. //
  138. // Write() marshals the proto message and writes it to the given writer. Each
  139. // message is prefixed with a 4 byte big endian unsigned integer as the length.
  140. //
  141. // Content is kept in a buffer, and is flushed every 60 seconds.
  142. //
  143. // Close closes the WriteCloser.
  144. func NewBufferedSink(o io.WriteCloser) Sink {
  145. bufW := bufio.NewWriter(o)
  146. return &bufferedSink{
  147. closer: o,
  148. out: newWriterSink(bufW),
  149. buf: bufW,
  150. done: make(chan struct{}),
  151. }
  152. }