s3api_server.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "net/http"
  7. "strings"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/filer"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  13. "github.com/gorilla/mux"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  16. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  17. "github.com/seaweedfs/seaweedfs/weed/security"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  20. util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
  21. "google.golang.org/grpc"
  22. )
  23. type S3ApiServerOption struct {
  24. Filer pb.ServerAddress
  25. Port int
  26. Config string
  27. DomainName string
  28. AllowedOrigins []string
  29. BucketsPath string
  30. GrpcDialOption grpc.DialOption
  31. AllowEmptyFolder bool
  32. AllowDeleteBucketNotEmpty bool
  33. LocalFilerSocket string
  34. DataCenter string
  35. FilerGroup string
  36. }
  37. type S3ApiServer struct {
  38. s3_pb.UnimplementedSeaweedS3Server
  39. option *S3ApiServerOption
  40. iam *IdentityAccessManagement
  41. cb *CircuitBreaker
  42. randomClientId int32
  43. filerGuard *security.Guard
  44. client util_http_client.HTTPClientInterface
  45. bucketRegistry *BucketRegistry
  46. }
  47. func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
  48. v := util.GetViper()
  49. signingKey := v.GetString("jwt.filer_signing.key")
  50. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  51. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  52. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  53. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  54. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  55. v.SetDefault("cors.allowed_origins.values", "*")
  56. if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) {
  57. allowedOrigins := v.GetString("cors.allowed_origins.values")
  58. domains := strings.Split(allowedOrigins, ",")
  59. option.AllowedOrigins = domains
  60. }
  61. s3ApiServer = &S3ApiServer{
  62. option: option,
  63. iam: NewIdentityAccessManagement(option),
  64. randomClientId: util.RandomInt32(),
  65. filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
  66. cb: NewCircuitBreaker(option),
  67. }
  68. if option.Config != "" {
  69. grace.OnReload(func() {
  70. if err := s3ApiServer.iam.loadS3ApiConfigurationFromFile(option.Config); err != nil {
  71. glog.Errorf("fail to load config file %s: %v", option.Config, err)
  72. } else {
  73. glog.V(0).Infof("Loaded %d identities from config file %s", len(s3ApiServer.iam.identities), option.Config)
  74. }
  75. })
  76. }
  77. s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer)
  78. if option.LocalFilerSocket == "" {
  79. if s3ApiServer.client, err = util_http.NewGlobalHttpClient(); err != nil {
  80. return nil, err
  81. }
  82. } else {
  83. s3ApiServer.client = &http.Client{
  84. Transport: &http.Transport{
  85. DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
  86. return net.Dial("unix", option.LocalFilerSocket)
  87. },
  88. },
  89. }
  90. }
  91. s3ApiServer.registerRouter(router)
  92. go s3ApiServer.subscribeMetaEvents("s3", time.Now().UnixNano(), filer.DirectoryEtcRoot, []string{option.BucketsPath})
  93. return s3ApiServer, nil
  94. }
  95. func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
  96. // API Router
  97. apiRouter := router.PathPrefix("/").Subrouter()
  98. // Readiness Probe
  99. apiRouter.Methods(http.MethodGet).Path("/status").HandlerFunc(s3a.StatusHandler)
  100. apiRouter.Methods(http.MethodGet).Path("/healthz").HandlerFunc(s3a.StatusHandler)
  101. apiRouter.Methods(http.MethodOptions).HandlerFunc(
  102. func(w http.ResponseWriter, r *http.Request) {
  103. origin := r.Header.Get("Origin")
  104. if origin != "" {
  105. if s3a.option.AllowedOrigins == nil || len(s3a.option.AllowedOrigins) == 0 || s3a.option.AllowedOrigins[0] == "*" {
  106. origin = "*"
  107. } else {
  108. originFound := false
  109. for _, allowedOrigin := range s3a.option.AllowedOrigins {
  110. if origin == allowedOrigin {
  111. originFound = true
  112. }
  113. }
  114. if !originFound {
  115. writeFailureResponse(w, r, http.StatusForbidden)
  116. return
  117. }
  118. }
  119. }
  120. w.Header().Set("Access-Control-Allow-Origin", origin)
  121. w.Header().Set("Access-Control-Expose-Headers", "*")
  122. w.Header().Set("Access-Control-Allow-Methods", "*")
  123. w.Header().Set("Access-Control-Allow-Headers", "*")
  124. writeSuccessResponseEmpty(w, r)
  125. })
  126. var routers []*mux.Router
  127. if s3a.option.DomainName != "" {
  128. domainNames := strings.Split(s3a.option.DomainName, ",")
  129. for _, domainName := range domainNames {
  130. routers = append(routers, apiRouter.Host(
  131. fmt.Sprintf("%s.%s:%d", "{bucket:.+}", domainName, s3a.option.Port)).Subrouter())
  132. routers = append(routers, apiRouter.Host(
  133. fmt.Sprintf("%s.%s", "{bucket:.+}", domainName)).Subrouter())
  134. }
  135. }
  136. routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
  137. for _, bucket := range routers {
  138. // each case should follow the next rule:
  139. // - requesting object with query must precede any other methods
  140. // - requesting object must precede any methods with buckets
  141. // - requesting bucket with query must precede raw methods with buckets
  142. // - requesting bucket must be processed in the end
  143. // objects with query
  144. // CopyObjectPart
  145. bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
  146. // PutObjectPart
  147. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
  148. // CompleteMultipartUpload
  149. bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
  150. // NewMultipartUpload
  151. bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
  152. // AbortMultipartUpload
  153. bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
  154. // ListObjectParts
  155. bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
  156. // ListMultipartUploads
  157. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
  158. // GetObjectTagging
  159. bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "")
  160. // PutObjectTagging
  161. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "")
  162. // DeleteObjectTagging
  163. bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "")
  164. // PutObjectACL
  165. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE_ACP)), "PUT")).Queries("acl", "")
  166. // PutObjectRetention
  167. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "")
  168. // PutObjectLegalHold
  169. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "")
  170. // PutObjectLockConfiguration
  171. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "")
  172. // GetObjectACL
  173. bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ_ACP)), "GET")).Queries("acl", "")
  174. // objects with query
  175. // raw objects
  176. // HeadObject
  177. bucket.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadObjectHandler, ACTION_READ)), "GET"))
  178. // GetObject, but directory listing is not supported
  179. bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectHandler, ACTION_READ)), "GET"))
  180. // CopyObject
  181. bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY"))
  182. // PutObject
  183. bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectHandler, ACTION_WRITE)), "PUT"))
  184. // DeleteObject
  185. bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE"))
  186. // raw objects
  187. // buckets with query
  188. // DeleteMultipleObjects
  189. bucket.Methods(http.MethodPost).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "")
  190. // GetBucketACL
  191. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ_ACP)), "GET")).Queries("acl", "")
  192. // PutBucketACL
  193. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE_ACP)), "PUT")).Queries("acl", "")
  194. // GetBucketPolicy
  195. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "")
  196. // PutBucketPolicy
  197. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "")
  198. // DeleteBucketPolicy
  199. bucket.Methods(http.MethodDelete).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "")
  200. // GetBucketCors
  201. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "")
  202. // PutBucketCors
  203. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "")
  204. // DeleteBucketCors
  205. bucket.Methods(http.MethodDelete).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "")
  206. // GetBucketLifecycleConfiguration
  207. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "")
  208. // PutBucketLifecycleConfiguration
  209. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "")
  210. // DeleteBucketLifecycleConfiguration
  211. bucket.Methods(http.MethodDelete).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "")
  212. // GetBucketLocation
  213. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "")
  214. // GetBucketRequestPayment
  215. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "")
  216. // GetBucketVersioning
  217. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketVersioningHandler, ACTION_READ)), "GET")).Queries("versioning", "")
  218. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketVersioningHandler, ACTION_WRITE)), "PUT")).Queries("versioning", "")
  219. // ListObjectsV2
  220. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2")
  221. // buckets with query
  222. // PutBucketOwnershipControls
  223. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketOwnershipControls, ACTION_ADMIN), "PUT")).Queries("ownershipControls", "")
  224. //GetBucketOwnershipControls
  225. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketOwnershipControls, ACTION_READ), "GET")).Queries("ownershipControls", "")
  226. //DeleteBucketOwnershipControls
  227. bucket.Methods(http.MethodDelete).HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketOwnershipControls, ACTION_ADMIN), "DELETE")).Queries("ownershipControls", "")
  228. // raw buckets
  229. // PostPolicy
  230. bucket.Methods(http.MethodPost).HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST"))
  231. // HeadBucket
  232. bucket.Methods(http.MethodHead).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET"))
  233. // PutBucket
  234. bucket.Methods(http.MethodPut).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketHandler, ACTION_ADMIN)), "PUT"))
  235. // DeleteBucket
  236. bucket.Methods(http.MethodDelete).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_DELETE_BUCKET)), "DELETE"))
  237. // ListObjectsV1 (Legacy)
  238. bucket.Methods(http.MethodGet).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))
  239. // raw buckets
  240. }
  241. // ListBuckets
  242. apiRouter.Methods(http.MethodGet).Path("/").HandlerFunc(track(s3a.ListBucketsHandler, "LIST"))
  243. // NotFound
  244. apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
  245. }