audit_fluent.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package s3err
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/fluent/fluent-logger-golang/fluent"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "net/http"
  9. "os"
  10. "time"
  11. )
  12. type AccessLogExtend struct {
  13. AccessLog
  14. AccessLogHTTP
  15. }
  16. type AccessLog struct {
  17. Bucket string `msg:"bucket" json:"bucket"` // awsexamplebucket1
  18. Time int64 `msg:"time" json:"time"` // [06/Feb/2019:00:00:38 +0000]
  19. RemoteIP string `msg:"remote_ip" json:"remote_ip,omitempty"` // 192.0.2.3
  20. Requester string `msg:"requester" json:"requester,omitempty"` // IAM user id
  21. RequestID string `msg:"request_id" json:"request_id,omitempty"` // 3E57427F33A59F07
  22. Operation string `msg:"operation" json:"operation,omitempty"` // REST.HTTP_method.resource_type REST.PUT.OBJECT
  23. Key string `msg:"key" json:"key,omitempty"` // /photos/2019/08/puppy.jpg
  24. ErrorCode string `msg:"error_code" json:"error_code,omitempty"`
  25. HostId string `msg:"host_id" json:"host_id,omitempty"`
  26. HostHeader string `msg:"host_header" json:"host_header,omitempty"` // s3.us-west-2.amazonaws.com
  27. UserAgent string `msg:"user_agent" json:"user_agent,omitempty"`
  28. HTTPStatus int `msg:"status" json:"status,omitempty"`
  29. SignatureVersion string `msg:"signature_version" json:"signature_version,omitempty"`
  30. }
  31. type AccessLogHTTP struct {
  32. RequestURI string `json:"request_uri,omitempty"` // "GET /awsexamplebucket1/photos/2019/08/puppy.jpg?x-foo=bar HTTP/1.1"
  33. BytesSent string `json:"bytes_sent,omitempty"`
  34. ObjectSize string `json:"object_size,omitempty"`
  35. TotalTime int `json:"total_time,omitempty"`
  36. TurnAroundTime int `json:"turn_around_time,omitempty"`
  37. Referer string `json:"Referer,omitempty"`
  38. VersionId string `json:"version_id,omitempty"`
  39. CipherSuite string `json:"cipher_suite,omitempty"`
  40. AuthenticationType string `json:"auth_type,omitempty"`
  41. TLSVersion string `json:"TLS_version,omitempty"`
  42. }
  43. const tag = "s3.access"
  44. var (
  45. Logger *fluent.Fluent
  46. hostname = os.Getenv("HOSTNAME")
  47. environment = os.Getenv("ENVIRONMENT")
  48. )
  49. func InitAuditLog(config string) {
  50. configContent, readErr := os.ReadFile(config)
  51. if readErr != nil {
  52. glog.Errorf("fail to read fluent config %s : %v", config, readErr)
  53. return
  54. }
  55. fluentConfig := &fluent.Config{}
  56. if err := json.Unmarshal(configContent, fluentConfig); err != nil {
  57. glog.Errorf("fail to parse fluent config %s : %v", string(configContent), err)
  58. return
  59. }
  60. if len(fluentConfig.TagPrefix) == 0 && len(environment) > 0 {
  61. fluentConfig.TagPrefix = environment
  62. }
  63. fluentConfig.Async = true
  64. fluentConfig.AsyncResultCallback = func(data []byte, err error) {
  65. if err != nil {
  66. glog.Warning("Error while posting log: ", err)
  67. }
  68. }
  69. var err error
  70. Logger, err = fluent.New(*fluentConfig)
  71. if err != nil {
  72. glog.Errorf("fail to load fluent config: %v", err)
  73. }
  74. }
  75. func getREST(httpMetod string, resourceType string) string {
  76. return fmt.Sprintf("REST.%s.%s", httpMetod, resourceType)
  77. }
  78. func getResourceType(object string, query_key string, metod string) (string, bool) {
  79. if object == "/" {
  80. switch query_key {
  81. case "delete":
  82. return "BATCH.DELETE.OBJECT", true
  83. case "tagging":
  84. return getREST(metod, "OBJECTTAGGING"), true
  85. case "lifecycle":
  86. return getREST(metod, "LIFECYCLECONFIGURATION"), true
  87. case "acl":
  88. return getREST(metod, "ACCESSCONTROLPOLICY"), true
  89. case "policy":
  90. return getREST(metod, "BUCKETPOLICY"), true
  91. default:
  92. return getREST(metod, "BUCKET"), false
  93. }
  94. } else {
  95. switch query_key {
  96. case "tagging":
  97. return getREST(metod, "OBJECTTAGGING"), true
  98. default:
  99. return getREST(metod, "OBJECT"), false
  100. }
  101. }
  102. }
  103. func getOperation(object string, r *http.Request) string {
  104. queries := r.URL.Query()
  105. var operation string
  106. var queryFound bool
  107. for key, _ := range queries {
  108. operation, queryFound = getResourceType(object, key, r.Method)
  109. if queryFound {
  110. return operation
  111. }
  112. }
  113. if len(queries) == 0 {
  114. operation, _ = getResourceType(object, "", r.Method)
  115. }
  116. return operation
  117. }
  118. func GetAccessHttpLog(r *http.Request, statusCode int, s3errCode ErrorCode) AccessLogHTTP {
  119. return AccessLogHTTP{
  120. RequestURI: r.RequestURI,
  121. Referer: r.Header.Get("Referer"),
  122. }
  123. }
  124. func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *AccessLog {
  125. bucket, key := s3_constants.GetBucketAndObject(r)
  126. var errorCode string
  127. if s3errCode != ErrNone {
  128. errorCode = GetAPIError(s3errCode).Code
  129. }
  130. remoteIP := r.Header.Get("X-Real-IP")
  131. if len(remoteIP) == 0 {
  132. remoteIP = r.RemoteAddr
  133. }
  134. hostHeader := r.Header.Get("X-Forwarded-Host")
  135. if len(hostHeader) == 0 {
  136. hostHeader = r.Host
  137. }
  138. return &AccessLog{
  139. HostHeader: hostHeader,
  140. RequestID: r.Header.Get("X-Request-ID"),
  141. RemoteIP: remoteIP,
  142. Requester: r.Header.Get(s3_constants.AmzIdentityId),
  143. SignatureVersion: r.Header.Get(s3_constants.AmzAuthType),
  144. UserAgent: r.Header.Get("user-agent"),
  145. HostId: hostname,
  146. Bucket: bucket,
  147. HTTPStatus: HTTPStatusCode,
  148. Time: time.Now().Unix(),
  149. Key: key,
  150. Operation: getOperation(key, r),
  151. ErrorCode: errorCode,
  152. }
  153. }
  154. func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) {
  155. if Logger == nil {
  156. return
  157. }
  158. if err := Logger.Post(tag, *GetAccessLog(r, HTTPStatusCode, errorCode)); err != nil {
  159. glog.Warning("Error while posting log: ", err)
  160. }
  161. }
  162. func PostAccessLog(log AccessLog) {
  163. if Logger == nil || len(log.Key) == 0 {
  164. return
  165. }
  166. if err := Logger.Post(tag, log); err != nil {
  167. glog.Warning("Error while posting log: ", err)
  168. }
  169. }