s3.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/gorilla/mux"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/s3api"
  13. stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. s3StandaloneOptions S3Options
  18. )
  19. type S3Options struct {
  20. filer *string
  21. port *int
  22. config *string
  23. domainName *string
  24. tlsPrivateKey *string
  25. tlsCertificate *string
  26. metricsHttpPort *int
  27. allowEmptyFolder *bool
  28. }
  29. func init() {
  30. cmdS3.Run = runS3 // break init cycle
  31. s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
  32. s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
  33. s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
  34. s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
  35. s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
  36. s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
  37. s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
  38. s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", false, "allow empty folders")
  39. }
  40. var cmdS3 = &Command{
  41. UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
  42. Short: "start a s3 API compatible server that is backed by a filer",
  43. Long: `start a s3 API compatible server that is backed by a filer.
  44. By default, you can use any access key and secret key to access the S3 APIs.
  45. To enable credential based access, create a config.json file similar to this:
  46. {
  47. "identities": [
  48. {
  49. "name": "anonymous",
  50. "actions": [
  51. "Read"
  52. ]
  53. },
  54. {
  55. "name": "some_admin_user",
  56. "credentials": [
  57. {
  58. "accessKey": "some_access_key1",
  59. "secretKey": "some_secret_key1"
  60. }
  61. ],
  62. "actions": [
  63. "Admin",
  64. "Read",
  65. "List",
  66. "Tagging",
  67. "Write"
  68. ]
  69. },
  70. {
  71. "name": "some_read_only_user",
  72. "credentials": [
  73. {
  74. "accessKey": "some_access_key2",
  75. "secretKey": "some_secret_key2"
  76. }
  77. ],
  78. "actions": [
  79. "Read"
  80. ]
  81. },
  82. {
  83. "name": "some_normal_user",
  84. "credentials": [
  85. {
  86. "accessKey": "some_access_key3",
  87. "secretKey": "some_secret_key3"
  88. }
  89. ],
  90. "actions": [
  91. "Read",
  92. "List",
  93. "Tagging",
  94. "Write"
  95. ]
  96. },
  97. {
  98. "name": "user_limited_to_bucket1",
  99. "credentials": [
  100. {
  101. "accessKey": "some_access_key4",
  102. "secretKey": "some_secret_key4"
  103. }
  104. ],
  105. "actions": [
  106. "Read:bucket1",
  107. "List:bucket1",
  108. "Tagging:bucket1",
  109. "Write:bucket1"
  110. ]
  111. }
  112. ]
  113. }
  114. `,
  115. }
  116. func runS3(cmd *Command, args []string) bool {
  117. util.LoadConfiguration("security", false)
  118. go stats_collect.StartMetricsServer(*s3StandaloneOptions.metricsHttpPort)
  119. return s3StandaloneOptions.startS3Server()
  120. }
  121. func (s3opt *S3Options) startS3Server() bool {
  122. filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer)
  123. if err != nil {
  124. glog.Fatal(err)
  125. return false
  126. }
  127. filerBucketsPath := "/buckets"
  128. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  129. // metrics read from the filer
  130. var metricsAddress string
  131. var metricsIntervalSec int
  132. for {
  133. err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  134. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  135. if err != nil {
  136. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  137. }
  138. filerBucketsPath = resp.DirBuckets
  139. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
  140. glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
  141. return nil
  142. })
  143. if err != nil {
  144. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
  145. time.Sleep(time.Second)
  146. } else {
  147. glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
  148. break
  149. }
  150. }
  151. go stats_collect.LoopPushingMetric("s3", stats_collect.SourceName(uint32(*s3opt.port)), metricsAddress, metricsIntervalSec)
  152. router := mux.NewRouter().SkipClean(true)
  153. _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
  154. Filer: *s3opt.filer,
  155. Port: *s3opt.port,
  156. FilerGrpcAddress: filerGrpcAddress,
  157. Config: *s3opt.config,
  158. DomainName: *s3opt.domainName,
  159. BucketsPath: filerBucketsPath,
  160. GrpcDialOption: grpcDialOption,
  161. AllowEmptyFolder: *s3opt.allowEmptyFolder,
  162. })
  163. if s3ApiServer_err != nil {
  164. glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
  165. }
  166. httpS := &http.Server{Handler: router}
  167. listenAddress := fmt.Sprintf(":%d", *s3opt.port)
  168. s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
  169. if err != nil {
  170. glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
  171. }
  172. if *s3opt.tlsPrivateKey != "" {
  173. glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
  174. if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
  175. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  176. }
  177. } else {
  178. glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
  179. if err = httpS.Serve(s3ApiListener); err != nil {
  180. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  181. }
  182. }
  183. return true
  184. }