method_logger.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package binarylog
  19. import (
  20. "context"
  21. "net"
  22. "strings"
  23. "sync/atomic"
  24. "time"
  25. "github.com/golang/protobuf/proto"
  26. "github.com/golang/protobuf/ptypes"
  27. binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
  28. "google.golang.org/grpc/metadata"
  29. "google.golang.org/grpc/status"
  30. )
  31. type callIDGenerator struct {
  32. id uint64
  33. }
  34. func (g *callIDGenerator) next() uint64 {
  35. id := atomic.AddUint64(&g.id, 1)
  36. return id
  37. }
  38. // reset is for testing only, and doesn't need to be thread safe.
  39. func (g *callIDGenerator) reset() {
  40. g.id = 0
  41. }
  42. var idGen callIDGenerator
  43. // MethodLogger is the sub-logger for each method.
  44. //
  45. // This is used in the 1.0 release of gcp/observability, and thus must not be
  46. // deleted or changed.
  47. type MethodLogger interface {
  48. Log(context.Context, LogEntryConfig)
  49. }
  50. // TruncatingMethodLogger is a method logger that truncates headers and messages
  51. // based on configured fields.
  52. type TruncatingMethodLogger struct {
  53. headerMaxLen, messageMaxLen uint64
  54. callID uint64
  55. idWithinCallGen *callIDGenerator
  56. sink Sink // TODO(blog): make this plugable.
  57. }
  58. // NewTruncatingMethodLogger returns a new truncating method logger.
  59. //
  60. // This is used in the 1.0 release of gcp/observability, and thus must not be
  61. // deleted or changed.
  62. func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
  63. return &TruncatingMethodLogger{
  64. headerMaxLen: h,
  65. messageMaxLen: m,
  66. callID: idGen.next(),
  67. idWithinCallGen: &callIDGenerator{},
  68. sink: DefaultSink, // TODO(blog): make it plugable.
  69. }
  70. }
  71. // Build is an internal only method for building the proto message out of the
  72. // input event. It's made public to enable other library to reuse as much logic
  73. // in TruncatingMethodLogger as possible.
  74. func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
  75. m := c.toProto()
  76. timestamp, _ := ptypes.TimestampProto(time.Now())
  77. m.Timestamp = timestamp
  78. m.CallId = ml.callID
  79. m.SequenceIdWithinCall = ml.idWithinCallGen.next()
  80. switch pay := m.Payload.(type) {
  81. case *binlogpb.GrpcLogEntry_ClientHeader:
  82. m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
  83. case *binlogpb.GrpcLogEntry_ServerHeader:
  84. m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
  85. case *binlogpb.GrpcLogEntry_Message:
  86. m.PayloadTruncated = ml.truncateMessage(pay.Message)
  87. }
  88. return m
  89. }
  90. // Log creates a proto binary log entry, and logs it to the sink.
  91. func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
  92. ml.sink.Write(ml.Build(c))
  93. }
  94. func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
  95. if ml.headerMaxLen == maxUInt {
  96. return false
  97. }
  98. var (
  99. bytesLimit = ml.headerMaxLen
  100. index int
  101. )
  102. // At the end of the loop, index will be the first entry where the total
  103. // size is greater than the limit:
  104. //
  105. // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
  106. for ; index < len(mdPb.Entry); index++ {
  107. entry := mdPb.Entry[index]
  108. if entry.Key == "grpc-trace-bin" {
  109. // "grpc-trace-bin" is a special key. It's kept in the log entry,
  110. // but not counted towards the size limit.
  111. continue
  112. }
  113. currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
  114. if currentEntryLen > bytesLimit {
  115. break
  116. }
  117. bytesLimit -= currentEntryLen
  118. }
  119. truncated = index < len(mdPb.Entry)
  120. mdPb.Entry = mdPb.Entry[:index]
  121. return truncated
  122. }
  123. func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
  124. if ml.messageMaxLen == maxUInt {
  125. return false
  126. }
  127. if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
  128. return false
  129. }
  130. msgPb.Data = msgPb.Data[:ml.messageMaxLen]
  131. return true
  132. }
  133. // LogEntryConfig represents the configuration for binary log entry.
  134. //
  135. // This is used in the 1.0 release of gcp/observability, and thus must not be
  136. // deleted or changed.
  137. type LogEntryConfig interface {
  138. toProto() *binlogpb.GrpcLogEntry
  139. }
  140. // ClientHeader configs the binary log entry to be a ClientHeader entry.
  141. type ClientHeader struct {
  142. OnClientSide bool
  143. Header metadata.MD
  144. MethodName string
  145. Authority string
  146. Timeout time.Duration
  147. // PeerAddr is required only when it's on server side.
  148. PeerAddr net.Addr
  149. }
  150. func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
  151. // This function doesn't need to set all the fields (e.g. seq ID). The Log
  152. // function will set the fields when necessary.
  153. clientHeader := &binlogpb.ClientHeader{
  154. Metadata: mdToMetadataProto(c.Header),
  155. MethodName: c.MethodName,
  156. Authority: c.Authority,
  157. }
  158. if c.Timeout > 0 {
  159. clientHeader.Timeout = ptypes.DurationProto(c.Timeout)
  160. }
  161. ret := &binlogpb.GrpcLogEntry{
  162. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
  163. Payload: &binlogpb.GrpcLogEntry_ClientHeader{
  164. ClientHeader: clientHeader,
  165. },
  166. }
  167. if c.OnClientSide {
  168. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  169. } else {
  170. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  171. }
  172. if c.PeerAddr != nil {
  173. ret.Peer = addrToProto(c.PeerAddr)
  174. }
  175. return ret
  176. }
  177. // ServerHeader configs the binary log entry to be a ServerHeader entry.
  178. type ServerHeader struct {
  179. OnClientSide bool
  180. Header metadata.MD
  181. // PeerAddr is required only when it's on client side.
  182. PeerAddr net.Addr
  183. }
  184. func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
  185. ret := &binlogpb.GrpcLogEntry{
  186. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
  187. Payload: &binlogpb.GrpcLogEntry_ServerHeader{
  188. ServerHeader: &binlogpb.ServerHeader{
  189. Metadata: mdToMetadataProto(c.Header),
  190. },
  191. },
  192. }
  193. if c.OnClientSide {
  194. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  195. } else {
  196. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  197. }
  198. if c.PeerAddr != nil {
  199. ret.Peer = addrToProto(c.PeerAddr)
  200. }
  201. return ret
  202. }
  203. // ClientMessage configs the binary log entry to be a ClientMessage entry.
  204. type ClientMessage struct {
  205. OnClientSide bool
  206. // Message can be a proto.Message or []byte. Other messages formats are not
  207. // supported.
  208. Message interface{}
  209. }
  210. func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
  211. var (
  212. data []byte
  213. err error
  214. )
  215. if m, ok := c.Message.(proto.Message); ok {
  216. data, err = proto.Marshal(m)
  217. if err != nil {
  218. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  219. }
  220. } else if b, ok := c.Message.([]byte); ok {
  221. data = b
  222. } else {
  223. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  224. }
  225. ret := &binlogpb.GrpcLogEntry{
  226. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
  227. Payload: &binlogpb.GrpcLogEntry_Message{
  228. Message: &binlogpb.Message{
  229. Length: uint32(len(data)),
  230. Data: data,
  231. },
  232. },
  233. }
  234. if c.OnClientSide {
  235. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  236. } else {
  237. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  238. }
  239. return ret
  240. }
  241. // ServerMessage configs the binary log entry to be a ServerMessage entry.
  242. type ServerMessage struct {
  243. OnClientSide bool
  244. // Message can be a proto.Message or []byte. Other messages formats are not
  245. // supported.
  246. Message interface{}
  247. }
  248. func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
  249. var (
  250. data []byte
  251. err error
  252. )
  253. if m, ok := c.Message.(proto.Message); ok {
  254. data, err = proto.Marshal(m)
  255. if err != nil {
  256. grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
  257. }
  258. } else if b, ok := c.Message.([]byte); ok {
  259. data = b
  260. } else {
  261. grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
  262. }
  263. ret := &binlogpb.GrpcLogEntry{
  264. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
  265. Payload: &binlogpb.GrpcLogEntry_Message{
  266. Message: &binlogpb.Message{
  267. Length: uint32(len(data)),
  268. Data: data,
  269. },
  270. },
  271. }
  272. if c.OnClientSide {
  273. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  274. } else {
  275. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  276. }
  277. return ret
  278. }
  279. // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
  280. type ClientHalfClose struct {
  281. OnClientSide bool
  282. }
  283. func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
  284. ret := &binlogpb.GrpcLogEntry{
  285. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
  286. Payload: nil, // No payload here.
  287. }
  288. if c.OnClientSide {
  289. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  290. } else {
  291. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  292. }
  293. return ret
  294. }
  295. // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
  296. type ServerTrailer struct {
  297. OnClientSide bool
  298. Trailer metadata.MD
  299. // Err is the status error.
  300. Err error
  301. // PeerAddr is required only when it's on client side and the RPC is trailer
  302. // only.
  303. PeerAddr net.Addr
  304. }
  305. func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
  306. st, ok := status.FromError(c.Err)
  307. if !ok {
  308. grpclogLogger.Info("binarylogging: error in trailer is not a status error")
  309. }
  310. var (
  311. detailsBytes []byte
  312. err error
  313. )
  314. stProto := st.Proto()
  315. if stProto != nil && len(stProto.Details) != 0 {
  316. detailsBytes, err = proto.Marshal(stProto)
  317. if err != nil {
  318. grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
  319. }
  320. }
  321. ret := &binlogpb.GrpcLogEntry{
  322. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
  323. Payload: &binlogpb.GrpcLogEntry_Trailer{
  324. Trailer: &binlogpb.Trailer{
  325. Metadata: mdToMetadataProto(c.Trailer),
  326. StatusCode: uint32(st.Code()),
  327. StatusMessage: st.Message(),
  328. StatusDetails: detailsBytes,
  329. },
  330. },
  331. }
  332. if c.OnClientSide {
  333. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  334. } else {
  335. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  336. }
  337. if c.PeerAddr != nil {
  338. ret.Peer = addrToProto(c.PeerAddr)
  339. }
  340. return ret
  341. }
  342. // Cancel configs the binary log entry to be a Cancel entry.
  343. type Cancel struct {
  344. OnClientSide bool
  345. }
  346. func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
  347. ret := &binlogpb.GrpcLogEntry{
  348. Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
  349. Payload: nil,
  350. }
  351. if c.OnClientSide {
  352. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
  353. } else {
  354. ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
  355. }
  356. return ret
  357. }
  358. // metadataKeyOmit returns whether the metadata entry with this key should be
  359. // omitted.
  360. func metadataKeyOmit(key string) bool {
  361. switch key {
  362. case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
  363. return true
  364. case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users.
  365. return false
  366. }
  367. return strings.HasPrefix(key, "grpc-")
  368. }
  369. func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
  370. ret := &binlogpb.Metadata{}
  371. for k, vv := range md {
  372. if metadataKeyOmit(k) {
  373. continue
  374. }
  375. for _, v := range vv {
  376. ret.Entry = append(ret.Entry,
  377. &binlogpb.MetadataEntry{
  378. Key: k,
  379. Value: []byte(v),
  380. },
  381. )
  382. }
  383. }
  384. return ret
  385. }
  386. func addrToProto(addr net.Addr) *binlogpb.Address {
  387. ret := &binlogpb.Address{}
  388. switch a := addr.(type) {
  389. case *net.TCPAddr:
  390. if a.IP.To4() != nil {
  391. ret.Type = binlogpb.Address_TYPE_IPV4
  392. } else if a.IP.To16() != nil {
  393. ret.Type = binlogpb.Address_TYPE_IPV6
  394. } else {
  395. ret.Type = binlogpb.Address_TYPE_UNKNOWN
  396. // Do not set address and port fields.
  397. break
  398. }
  399. ret.Address = a.IP.String()
  400. ret.IpPort = uint32(a.Port)
  401. case *net.UnixAddr:
  402. ret.Type = binlogpb.Address_TYPE_UNIX
  403. ret.Address = a.String()
  404. default:
  405. ret.Type = binlogpb.Address_TYPE_UNKNOWN
  406. }
  407. return ret
  408. }