s3.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. package command
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "fmt"
  7. "io/ioutil"
  8. "net"
  9. "net/http"
  10. "os"
  11. "runtime"
  12. "strings"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  15. "google.golang.org/grpc/credentials/tls/certprovider"
  16. "google.golang.org/grpc/credentials/tls/certprovider/pemfile"
  17. "google.golang.org/grpc/reflection"
  18. "github.com/seaweedfs/seaweedfs/weed/pb"
  19. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  20. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  21. "github.com/seaweedfs/seaweedfs/weed/security"
  22. "github.com/gorilla/mux"
  23. "github.com/seaweedfs/seaweedfs/weed/glog"
  24. "github.com/seaweedfs/seaweedfs/weed/s3api"
  25. stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
  26. "github.com/seaweedfs/seaweedfs/weed/util"
  27. )
  28. var (
  29. s3StandaloneOptions S3Options
  30. )
  31. type S3Options struct {
  32. filer *string
  33. bindIp *string
  34. port *int
  35. portHttps *int
  36. portGrpc *int
  37. config *string
  38. domainName *string
  39. allowedOrigins *string
  40. tlsPrivateKey *string
  41. tlsCertificate *string
  42. tlsCACertificate *string
  43. tlsVerifyClientCert *bool
  44. metricsHttpPort *int
  45. allowEmptyFolder *bool
  46. allowDeleteBucketNotEmpty *bool
  47. auditLogConfig *string
  48. localFilerSocket *string
  49. dataCenter *string
  50. localSocket *string
  51. certProvider certprovider.Provider
  52. }
  53. func init() {
  54. cmdS3.Run = runS3 // break init cycle
  55. s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
  56. s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
  57. s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
  58. s3StandaloneOptions.portHttps = cmdS3.Flag.Int("port.https", 0, "s3 server https listen port")
  59. s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port")
  60. s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
  61. s3StandaloneOptions.allowedOrigins = cmdS3.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins")
  62. s3StandaloneOptions.dataCenter = cmdS3.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
  63. s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
  64. s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file")
  65. s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
  66. s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
  67. s3StandaloneOptions.tlsCACertificate = cmdS3.Flag.String("cacert.file", "", "path to the TLS CA certificate file")
  68. s3StandaloneOptions.tlsVerifyClientCert = cmdS3.Flag.Bool("tlsVerifyClientCert", false, "whether to verify the client's certificate")
  69. s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
  70. s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
  71. s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
  72. s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
  73. s3StandaloneOptions.localSocket = cmdS3.Flag.String("localSocket", "", "default to /tmp/seaweedfs-s3-<port>.sock")
  74. }
  75. var cmdS3 = &Command{
  76. UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
  77. Short: "start a s3 API compatible server that is backed by a filer",
  78. Long: `start a s3 API compatible server that is backed by a filer.
  79. By default, you can use any access key and secret key to access the S3 APIs.
  80. To enable credential based access, create a config.json file similar to this:
  81. {
  82. "identities": [
  83. {
  84. "name": "anonymous",
  85. "actions": [
  86. "Read"
  87. ]
  88. },
  89. {
  90. "name": "some_admin_user",
  91. "credentials": [
  92. {
  93. "accessKey": "some_access_key1",
  94. "secretKey": "some_secret_key1"
  95. }
  96. ],
  97. "actions": [
  98. "Admin",
  99. "Read",
  100. "List",
  101. "Tagging",
  102. "Write"
  103. ]
  104. },
  105. {
  106. "name": "some_read_only_user",
  107. "credentials": [
  108. {
  109. "accessKey": "some_access_key2",
  110. "secretKey": "some_secret_key2"
  111. }
  112. ],
  113. "actions": [
  114. "Read"
  115. ]
  116. },
  117. {
  118. "name": "some_normal_user",
  119. "credentials": [
  120. {
  121. "accessKey": "some_access_key3",
  122. "secretKey": "some_secret_key3"
  123. }
  124. ],
  125. "actions": [
  126. "Read",
  127. "List",
  128. "Tagging",
  129. "Write"
  130. ]
  131. },
  132. {
  133. "name": "user_limited_to_bucket1",
  134. "credentials": [
  135. {
  136. "accessKey": "some_access_key4",
  137. "secretKey": "some_secret_key4"
  138. }
  139. ],
  140. "actions": [
  141. "Read:bucket1",
  142. "List:bucket1",
  143. "Tagging:bucket1",
  144. "Write:bucket1"
  145. ]
  146. }
  147. ]
  148. }
  149. `,
  150. }
  151. func runS3(cmd *Command, args []string) bool {
  152. util.LoadConfiguration("security", false)
  153. go stats_collect.StartMetricsServer(*s3StandaloneOptions.bindIp, *s3StandaloneOptions.metricsHttpPort)
  154. return s3StandaloneOptions.startS3Server()
  155. }
  156. // GetCertificateWithUpdate Auto refreshing TSL certificate
  157. func (S3opt *S3Options) GetCertificateWithUpdate(*tls.ClientHelloInfo) (*tls.Certificate, error) {
  158. certs, err := S3opt.certProvider.KeyMaterial(context.Background())
  159. return &certs.Certs[0], err
  160. }
  161. func (s3opt *S3Options) startS3Server() bool {
  162. filerAddress := pb.ServerAddress(*s3opt.filer)
  163. filerBucketsPath := "/buckets"
  164. filerGroup := ""
  165. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  166. // metrics read from the filer
  167. var metricsAddress string
  168. var metricsIntervalSec int
  169. for {
  170. err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  171. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  172. if err != nil {
  173. return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
  174. }
  175. filerBucketsPath = resp.DirBuckets
  176. filerGroup = resp.FilerGroup
  177. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
  178. glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
  179. return nil
  180. })
  181. if err != nil {
  182. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
  183. time.Sleep(time.Second)
  184. } else {
  185. glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
  186. break
  187. }
  188. }
  189. go stats_collect.LoopPushingMetric("s3", stats_collect.SourceName(uint32(*s3opt.port)), metricsAddress, metricsIntervalSec)
  190. router := mux.NewRouter().SkipClean(true)
  191. var localFilerSocket string
  192. if s3opt.localFilerSocket != nil {
  193. localFilerSocket = *s3opt.localFilerSocket
  194. }
  195. s3ApiServer, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
  196. Filer: filerAddress,
  197. Port: *s3opt.port,
  198. Config: *s3opt.config,
  199. DomainName: *s3opt.domainName,
  200. AllowedOrigins: strings.Split(*s3opt.allowedOrigins, ","),
  201. BucketsPath: filerBucketsPath,
  202. GrpcDialOption: grpcDialOption,
  203. AllowEmptyFolder: *s3opt.allowEmptyFolder,
  204. AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
  205. LocalFilerSocket: localFilerSocket,
  206. DataCenter: *s3opt.dataCenter,
  207. FilerGroup: filerGroup,
  208. })
  209. if s3ApiServer_err != nil {
  210. glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
  211. }
  212. httpS := &http.Server{Handler: router}
  213. if *s3opt.portGrpc == 0 {
  214. *s3opt.portGrpc = 10000 + *s3opt.port
  215. }
  216. if *s3opt.bindIp == "" {
  217. *s3opt.bindIp = "localhost"
  218. }
  219. if runtime.GOOS != "windows" {
  220. localSocket := *s3opt.localSocket
  221. if localSocket == "" {
  222. localSocket = fmt.Sprintf("/tmp/seaweedfs-s3-%d.sock", *s3opt.port)
  223. }
  224. if err := os.Remove(localSocket); err != nil && !os.IsNotExist(err) {
  225. glog.Fatalf("Failed to remove %s, error: %s", localSocket, err.Error())
  226. }
  227. go func() {
  228. // start on local unix socket
  229. s3SocketListener, err := net.Listen("unix", localSocket)
  230. if err != nil {
  231. glog.Fatalf("Failed to listen on %s: %v", localSocket, err)
  232. }
  233. httpS.Serve(s3SocketListener)
  234. }()
  235. }
  236. listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port)
  237. s3ApiListener, s3ApiLocalListener, err := util.NewIpAndLocalListeners(*s3opt.bindIp, *s3opt.port, time.Duration(10)*time.Second)
  238. if err != nil {
  239. glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
  240. }
  241. if len(*s3opt.auditLogConfig) > 0 {
  242. s3err.InitAuditLog(*s3opt.auditLogConfig)
  243. if s3err.Logger != nil {
  244. defer s3err.Logger.Close()
  245. }
  246. }
  247. // starting grpc server
  248. grpcPort := *s3opt.portGrpc
  249. grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*s3opt.bindIp, grpcPort, 0)
  250. if err != nil {
  251. glog.Fatalf("s3 failed to listen on grpc port %d: %v", grpcPort, err)
  252. }
  253. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.s3"))
  254. s3_pb.RegisterSeaweedS3Server(grpcS, s3ApiServer)
  255. reflection.Register(grpcS)
  256. if grpcLocalL != nil {
  257. go grpcS.Serve(grpcLocalL)
  258. }
  259. go grpcS.Serve(grpcL)
  260. if *s3opt.tlsPrivateKey != "" {
  261. pemfileOptions := pemfile.Options{
  262. CertFile: *s3opt.tlsCertificate,
  263. KeyFile: *s3opt.tlsPrivateKey,
  264. RefreshDuration: security.CredRefreshingInterval,
  265. }
  266. if s3opt.certProvider, err = pemfile.NewProvider(pemfileOptions); err != nil {
  267. glog.Fatalf("pemfile.NewProvider(%v) failed: %v", pemfileOptions, err)
  268. }
  269. caCertPool := x509.NewCertPool()
  270. if *s3Options.tlsCACertificate != "" {
  271. // load CA certificate file and add it to list of client CAs
  272. caCertFile, err := ioutil.ReadFile(*s3opt.tlsCACertificate)
  273. if err != nil {
  274. glog.Fatalf("error reading CA certificate: %v", err)
  275. }
  276. caCertPool.AppendCertsFromPEM(caCertFile)
  277. }
  278. clientAuth := tls.NoClientCert
  279. if *s3Options.tlsVerifyClientCert {
  280. clientAuth = tls.RequireAndVerifyClientCert
  281. }
  282. httpS.TLSConfig = &tls.Config{
  283. GetCertificate: s3opt.GetCertificateWithUpdate,
  284. ClientAuth: clientAuth,
  285. ClientCAs: caCertPool,
  286. }
  287. if *s3opt.portHttps == 0 {
  288. glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
  289. if s3ApiLocalListener != nil {
  290. go func() {
  291. if err = httpS.ServeTLS(s3ApiLocalListener, "", ""); err != nil {
  292. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  293. }
  294. }()
  295. }
  296. if err = httpS.ServeTLS(s3ApiListener, "", ""); err != nil {
  297. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  298. }
  299. } else {
  300. glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.portHttps)
  301. s3ApiListenerHttps, s3ApiLocalListenerHttps, _ := util.NewIpAndLocalListeners(
  302. *s3opt.bindIp, *s3opt.portHttps, time.Duration(10)*time.Second)
  303. if s3ApiLocalListenerHttps != nil {
  304. go func() {
  305. if err = httpS.ServeTLS(s3ApiLocalListenerHttps, "", ""); err != nil {
  306. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  307. }
  308. }()
  309. }
  310. go func() {
  311. if err = httpS.ServeTLS(s3ApiListenerHttps, "", ""); err != nil {
  312. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  313. }
  314. }()
  315. }
  316. }
  317. if *s3opt.tlsPrivateKey == "" || *s3opt.portHttps > 0 {
  318. glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
  319. if s3ApiLocalListener != nil {
  320. go func() {
  321. if err = httpS.Serve(s3ApiLocalListener); err != nil {
  322. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  323. }
  324. }()
  325. }
  326. if err = httpS.Serve(s3ApiListener); err != nil {
  327. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  328. }
  329. }
  330. return true
  331. }