s3api_server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  7. "net"
  8. "net/http"
  9. "strings"
  10. "time"
  11. "github.com/gorilla/mux"
  12. "github.com/seaweedfs/seaweedfs/weed/pb"
  13. . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  14. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  15. "github.com/seaweedfs/seaweedfs/weed/security"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "google.golang.org/grpc"
  18. )
  19. type S3ApiServerOption struct {
  20. Filer pb.ServerAddress
  21. Port int
  22. Config string
  23. DomainName string
  24. BucketsPath string
  25. GrpcDialOption grpc.DialOption
  26. AllowEmptyFolder bool
  27. AllowDeleteBucketNotEmpty bool
  28. LocalFilerSocket string
  29. DataCenter string
  30. FilerGroup string
  31. }
  32. type S3ApiServer struct {
  33. s3_pb.UnimplementedSeaweedS3Server
  34. option *S3ApiServerOption
  35. iam *IdentityAccessManagement
  36. cb *CircuitBreaker
  37. randomClientId int32
  38. filerGuard *security.Guard
  39. client *http.Client
  40. bucketRegistry *BucketRegistry
  41. }
  42. func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer *S3ApiServer, err error) {
  43. v := util.GetViper()
  44. signingKey := v.GetString("jwt.filer_signing.key")
  45. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  46. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  47. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  48. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  49. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  50. s3ApiServer = &S3ApiServer{
  51. option: option,
  52. iam: NewIdentityAccessManagement(option),
  53. randomClientId: util.RandomInt32(),
  54. filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
  55. cb: NewCircuitBreaker(option),
  56. }
  57. s3ApiServer.bucketRegistry = NewBucketRegistry(s3ApiServer)
  58. if option.LocalFilerSocket == "" {
  59. s3ApiServer.client = &http.Client{Transport: &http.Transport{
  60. MaxIdleConns: 1024,
  61. MaxIdleConnsPerHost: 1024,
  62. }}
  63. } else {
  64. s3ApiServer.client = &http.Client{
  65. Transport: &http.Transport{
  66. DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
  67. return net.Dial("unix", option.LocalFilerSocket)
  68. },
  69. },
  70. }
  71. }
  72. s3ApiServer.registerRouter(router)
  73. go s3ApiServer.subscribeMetaEvents("s3", time.Now().UnixNano(), filer.DirectoryEtcRoot, []string{option.BucketsPath})
  74. return s3ApiServer, nil
  75. }
  76. func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
  77. // API Router
  78. apiRouter := router.PathPrefix("/").Subrouter()
  79. // Readiness Probe
  80. apiRouter.Methods("GET").Path("/status").HandlerFunc(s3a.StatusHandler)
  81. apiRouter.Methods("OPTIONS").HandlerFunc(
  82. func(w http.ResponseWriter, r *http.Request) {
  83. w.Header().Set("Access-Control-Allow-Origin", "*")
  84. w.Header().Set("Access-Control-Expose-Headers", "*")
  85. w.Header().Set("Access-Control-Allow-Methods", "*")
  86. w.Header().Set("Access-Control-Allow-Headers", "*")
  87. writeSuccessResponseEmpty(w, r)
  88. })
  89. var routers []*mux.Router
  90. if s3a.option.DomainName != "" {
  91. domainNames := strings.Split(s3a.option.DomainName, ",")
  92. for _, domainName := range domainNames {
  93. routers = append(routers, apiRouter.Host(
  94. fmt.Sprintf("%s.%s:%d", "{bucket:.+}", domainName, s3a.option.Port)).Subrouter())
  95. routers = append(routers, apiRouter.Host(
  96. fmt.Sprintf("%s.%s", "{bucket:.+}", domainName)).Subrouter())
  97. }
  98. }
  99. routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
  100. for _, bucket := range routers {
  101. // each case should follow the next rule:
  102. // - requesting object with query must precede any other methods
  103. // - requesting object must precede any methods with buckets
  104. // - requesting bucket with query must precede raw methods with buckets
  105. // - requesting bucket must be processed in the end
  106. // objects with query
  107. // CopyObjectPart
  108. bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", `.*?(\/|%2F).*?`).HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
  109. // PutObjectPart
  110. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectPartHandler, ACTION_WRITE)), "PUT")).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
  111. // CompleteMultipartUpload
  112. bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CompleteMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploadId", "{uploadId:.*}")
  113. // NewMultipartUpload
  114. bucket.Methods("POST").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.NewMultipartUploadHandler, ACTION_WRITE)), "POST")).Queries("uploads", "")
  115. // AbortMultipartUpload
  116. bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.AbortMultipartUploadHandler, ACTION_WRITE)), "DELETE")).Queries("uploadId", "{uploadId:.*}")
  117. // ListObjectParts
  118. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectPartsHandler, ACTION_READ)), "GET")).Queries("uploadId", "{uploadId:.*}")
  119. // ListMultipartUploads
  120. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListMultipartUploadsHandler, ACTION_READ)), "GET")).Queries("uploads", "")
  121. // GetObjectTagging
  122. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectTaggingHandler, ACTION_READ)), "GET")).Queries("tagging", "")
  123. // PutObjectTagging
  124. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectTaggingHandler, ACTION_TAGGING)), "PUT")).Queries("tagging", "")
  125. // DeleteObjectTagging
  126. bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectTaggingHandler, ACTION_TAGGING)), "DELETE")).Queries("tagging", "")
  127. // PutObjectACL
  128. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectAclHandler, ACTION_WRITE_ACP)), "PUT")).Queries("acl", "")
  129. // PutObjectRetention
  130. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectRetentionHandler, ACTION_WRITE)), "PUT")).Queries("retention", "")
  131. // PutObjectLegalHold
  132. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLegalHoldHandler, ACTION_WRITE)), "PUT")).Queries("legal-hold", "")
  133. // PutObjectLockConfiguration
  134. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectLockConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("object-lock", "")
  135. // GetObjectACL
  136. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectAclHandler, ACTION_READ_ACP)), "GET")).Queries("acl", "")
  137. // objects with query
  138. // raw objects
  139. // HeadObject
  140. bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadObjectHandler, ACTION_READ)), "GET"))
  141. // GetObject, but directory listing is not supported
  142. bucket.Methods("GET").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetObjectHandler, ACTION_READ)), "GET"))
  143. // CopyObject
  144. bucket.Methods("PUT").Path("/{object:.+}").HeadersRegexp("X-Amz-Copy-Source", ".*?(\\/|%2F).*?").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.CopyObjectHandler, ACTION_WRITE)), "COPY"))
  145. // PutObject
  146. bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutObjectHandler, ACTION_WRITE)), "PUT"))
  147. // DeleteObject
  148. bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteObjectHandler, ACTION_WRITE)), "DELETE"))
  149. // raw objects
  150. // buckets with query
  151. // DeleteMultipleObjects
  152. bucket.Methods("POST").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteMultipleObjectsHandler, ACTION_WRITE)), "DELETE")).Queries("delete", "")
  153. // GetBucketACL
  154. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketAclHandler, ACTION_READ_ACP)), "GET")).Queries("acl", "")
  155. // PutBucketACL
  156. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketAclHandler, ACTION_WRITE_ACP)), "PUT")).Queries("acl", "")
  157. // GetBucketPolicy
  158. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketPolicyHandler, ACTION_READ)), "GET")).Queries("policy", "")
  159. // PutBucketPolicy
  160. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketPolicyHandler, ACTION_WRITE)), "PUT")).Queries("policy", "")
  161. // DeleteBucketPolicy
  162. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketPolicyHandler, ACTION_WRITE)), "DELETE")).Queries("policy", "")
  163. // GetBucketCors
  164. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketCorsHandler, ACTION_READ)), "GET")).Queries("cors", "")
  165. // PutBucketCors
  166. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketCorsHandler, ACTION_WRITE)), "PUT")).Queries("cors", "")
  167. // DeleteBucketCors
  168. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketCorsHandler, ACTION_WRITE)), "DELETE")).Queries("cors", "")
  169. // GetBucketLifecycleConfiguration
  170. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLifecycleConfigurationHandler, ACTION_READ)), "GET")).Queries("lifecycle", "")
  171. // PutBucketLifecycleConfiguration
  172. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PutBucketLifecycleConfigurationHandler, ACTION_WRITE)), "PUT")).Queries("lifecycle", "")
  173. // DeleteBucketLifecycleConfiguration
  174. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketLifecycleHandler, ACTION_WRITE)), "DELETE")).Queries("lifecycle", "")
  175. // GetBucketLocation
  176. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketLocationHandler, ACTION_READ)), "GET")).Queries("location", "")
  177. // GetBucketRequestPayment
  178. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.GetBucketRequestPaymentHandler, ACTION_READ)), "GET")).Queries("requestPayment", "")
  179. // ListObjectsV2
  180. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV2Handler, ACTION_LIST)), "LIST")).Queries("list-type", "2")
  181. // buckets with query
  182. // PutBucketOwnershipControls
  183. bucket.Methods("PUT").HandlerFunc(track(s3a.iam.Auth(s3a.PutBucketOwnershipControls, ACTION_ADMIN), "PUT")).Queries("ownershipControls", "")
  184. //GetBucketOwnershipControls
  185. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.GetBucketOwnershipControls, ACTION_READ), "GET")).Queries("ownershipControls", "")
  186. //DeleteBucketOwnershipControls
  187. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.DeleteBucketOwnershipControls, ACTION_ADMIN), "DELETE")).Queries("ownershipControls", "")
  188. // raw buckets
  189. // PostPolicy
  190. bucket.Methods("POST").HeadersRegexp("Content-Type", "multipart/form-data*").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.PostPolicyBucketHandler, ACTION_WRITE)), "POST"))
  191. // HeadBucket
  192. bucket.Methods("HEAD").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.HeadBucketHandler, ACTION_READ)), "GET"))
  193. // PutBucket
  194. bucket.Methods("PUT").HandlerFunc(track(s3a.PutBucketHandler, "PUT"))
  195. // DeleteBucket
  196. bucket.Methods("DELETE").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.DeleteBucketHandler, ACTION_WRITE)), "DELETE"))
  197. // ListObjectsV1 (Legacy)
  198. bucket.Methods("GET").HandlerFunc(track(s3a.iam.Auth(s3a.cb.Limit(s3a.ListObjectsV1Handler, ACTION_LIST)), "LIST"))
  199. // raw buckets
  200. }
  201. // ListBuckets
  202. apiRouter.Methods("GET").Path("/").HandlerFunc(track(s3a.ListBucketsHandler, "LIST"))
  203. // NotFound
  204. apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
  205. }