s3api_server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3account"
  8. "net"
  9. "net/http"
  10. "strings"
  11. "time"
  12. "github.com/gorilla/mux"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  15. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  16. "github.com/seaweedfs/seaweedfs/weed/security"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "google.golang.org/grpc"
  19. )
  20. type S3ApiServerOption struct {
  21. Filer pb.ServerAddress
  22. Port int
  23. Config string
  24. DomainName string
  25. BucketsPath string
  26. GrpcDialOption grpc.DialOption
  27. AllowEmptyFolder bool
  28. AllowDeleteBucketNotEmpty bool
  29. LocalFilerSocket string
  30. DataCenter string
  31. }
  32. type S3ApiServer struct {
  33. s3_pb.UnimplementedSeaweedS3Server
  34. option *S3ApiServerOption
  35. iam *IdentityAccessManagement
  36. cb *CircuitBreaker
  37. randomClientId int32
  38. filerGuard *security.Guard
  39. client *http.Client
  40. accountManager *s3account.AccountManager
  41. bucketRegistry *BucketRegistry
  42. }
  43. func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
  44. v := util.GetViper()
  45. signingKey := v.GetString("jwt.filer_signing.key")
  46. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  47. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  48. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  49. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  50. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  51. s3ApiServer = &S3ApiServer{
  52. option: option,
  53. iam: NewIdentityAccessManagement(option),
  54. randomClientId: util.RandomInt32(),
  55. filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
  56. cb: NewCircuitBreaker(option),
  57. }
  58. s3ApiServer.accountManager = s3account.NewAccountManager(s3ApiServer)
  59. s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer)
  60. if option.LocalFilerSocket == "" {
  61. s3ApiServer.client = &http.Client{Transport: &http.Transport{
  62. MaxIdleConns: 1024,
  63. MaxIdleConnsPerHost: 1024,
  64. }}
  65. } else {
  66. s3ApiServer.client = &http.Client{
  67. Transport: &http.Transport{
  68. DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
  69. return net.Dial("unix", option.LocalFilerSocket)
  70. },
  71. },
  72. }
  73. }
  74. s3ApiServer.registerRouter(router)
  75. go s3ApiServer.subscribeMetaEvents("s3", time.Now().UnixNano(), filer.DirectoryEtcRoot, []string{option.BucketsPath})
  76. return s3ApiServer, nil
  77. }
  78. func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
  79. // API Router
  80. apiRouter := router.PathPrefix("/").Subrouter()
  81. // Readiness Probe
  82. apiRouter.Methods("GET").Path("/status").HandlerFunc(s3a.StatusHandler)
  83. apiRouter.Methods("OPTIONS").HandlerFunc(
  84. func(w http.ResponseWriter, r *http.Request) {
  85. w.Header().Set("Access-Control-Allow-Origin", "*")
  86. w.Header().Set("Access-Control-Expose-Headers", "*")
  87. w.Header().Set("Access-Control-Allow-Methods", "*")
  88. w.Header().Set("Access-Control-Allow-Headers", "*")
  89. writeSuccessResponseEmpty(w, r)
  90. })
  91. var routers []*mux.Router
  92. if s3a.option.DomainName != "" {
  93. domainNames := strings.Split(s3a.option.DomainName, ",")
  94. for _, domainName := range domainNames {
  95. routers = append(routers, apiRouter.Host(
  96. fmt.Sprintf("%s.%s:%d", "{bucket:.+}", domainName, s3a.option.Port)).Subrouter())
  97. routers = append(routers, apiRouter.Host(
  98. fmt.Sprintf("%s.%s", "{bucket:.+}", domainName)).Subrouter())
  99. }
  100. }
  101. routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
  102. for _, bucket := range routers {
  103. // each case should follow the next rule:
  104. // - requesting object with query must precede any other methods
  105. // - requesting object must precede any methods with buckets
  106. // - requesting bucket with query must precede raw methods with buckets
  107. // - requesting bucket must be processed in the end
  108. // objects with query
  109. // CopyObjectPart
  110. bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
  111. // PutObjectPart
  112. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
  113. // CompleteMultipartUpload
  114. bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
  115. // NewMultipartUpload
  116. bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
  117. // AbortMultipartUpload
  118. bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
  119. // ListObjectParts
  120. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
  121. // ListMultipartUploads
  122. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
  123. // GetObjectTagging
  124. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "")
  125. // PutObjectTagging
  126. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "")
  127. // DeleteObjectTagging
  128. bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "")
  129. // PutObjectACL
  130. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "")
  131. // PutObjectRetention
  132. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "")
  133. // PutObjectLegalHold
  134. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "")
  135. // PutObjectLockConfiguration
  136. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "")
  137. // GetObjectACL
  138. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ)), "GET")).Queries("acl", "")
  139. // objects with query
  140. // raw objects
  141. // HeadObject
  142. bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadObjectHandler, ACTION_READ)), "GET"))
  143. // GetObject, but directory listing is not supported
  144. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectHandler, ACTION_READ)), "GET"))
  145. // CopyObject
  146. bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY"))
  147. // PutObject
  148. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectHandler, ACTION_WRITE)), "PUT"))
  149. // DeleteObject
  150. bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE"))
  151. // raw objects
  152. // buckets with query
  153. // DeleteMultipleObjects
  154. bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "")
  155. // GetBucketACL
  156. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ)), "GET")).Queries("acl", "")
  157. // PutBucketACL
  158. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE)), "PUT")).Queries("acl", "")
  159. // GetBucketPolicy
  160. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "")
  161. // PutBucketPolicy
  162. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "")
  163. // DeleteBucketPolicy
  164. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "")
  165. // GetBucketCors
  166. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "")
  167. // PutBucketCors
  168. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "")
  169. // DeleteBucketCors
  170. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "")
  171. // GetBucketLifecycleConfiguration
  172. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "")
  173. // PutBucketLifecycleConfiguration
  174. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "")
  175. // DeleteBucketLifecycleConfiguration
  176. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "")
  177. // GetBucketLocation
  178. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "")
  179. // GetBucketRequestPayment
  180. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "")
  181. // ListObjectsV2
  182. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2")
  183. // buckets with query
  184. // PutBucketOwnershipControls
  185. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketOwnershipControls, ACTION_ADMIN), "PUT")).Queries("ownershipControls", "")
  186. //GetBucketOwnershipControls
  187. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketOwnershipControls, ACTION_READ), "GET")).Queries("ownershipControls", "")
  188. //DeleteBucketOwnershipControls
  189. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketOwnershipControls, ACTION_ADMIN), "DELETE")).Queries("ownershipControls", "")
  190. // raw buckets
  191. // PostPolicy
  192. bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST"))
  193. // HeadBucket
  194. bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET"))
  195. // PutBucket
  196. bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT"))
  197. // DeleteBucket
  198. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE"))
  199. // ListObjectsV1 (Legacy)
  200. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))
  201. // raw buckets
  202. }
  203. // ListBuckets
  204. apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.ListBucketsHandler, "LIST"))
  205. // NotFound
  206. apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
  207. }