s3.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  6. "google.golang.org/grpc/reflection"
  7. "net/http"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/security"
  13. "github.com/gorilla/mux"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/s3api"
  16. stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. )
  19. var (
  20. s3StandaloneOptions S3Options
  21. )
  22. type S3Options struct {
  23. filer *string
  24. bindIp *string
  25. port *int
  26. portGrpc *int
  27. config *string
  28. domainName *string
  29. tlsPrivateKey *string
  30. tlsCertificate *string
  31. metricsHttpPort *int
  32. allowEmptyFolder *bool
  33. allowDeleteBucketNotEmpty *bool
  34. auditLogConfig *string
  35. localFilerSocket *string
  36. dataCenter *string
  37. }
  38. func init() {
  39. cmdS3.Run = runS3 // break init cycle
  40. s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
  41. s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to. Default to localhost.")
  42. s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
  43. s3StandaloneOptions.portGrpc = cmdS3.Flag.Int("port.grpc", 0, "s3 server grpc listen port")
  44. s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
  45. s3StandaloneOptions.dataCenter = cmdS3.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
  46. s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
  47. s3StandaloneOptions.auditLogConfig = cmdS3.Flag.String("auditLogConfig", "", "path to the audit log config file")
  48. s3StandaloneOptions.tlsPrivateKey = cmdS3.Flag.String("key.file", "", "path to the TLS private key file")
  49. s3StandaloneOptions.tlsCertificate = cmdS3.Flag.String("cert.file", "", "path to the TLS certificate file")
  50. s3StandaloneOptions.metricsHttpPort = cmdS3.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
  51. s3StandaloneOptions.allowEmptyFolder = cmdS3.Flag.Bool("allowEmptyFolder", true, "allow empty folders")
  52. s3StandaloneOptions.allowDeleteBucketNotEmpty = cmdS3.Flag.Bool("allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
  53. s3StandaloneOptions.localFilerSocket = cmdS3.Flag.String("localFilerSocket", "", "local filer socket path")
  54. }
  55. var cmdS3 = &Command{
  56. UsageLine: "s3 [-port=8333] [-filer=<ip:port>] [-config=</path/to/config.json>]",
  57. Short: "start a s3 API compatible server that is backed by a filer",
  58. Long: `start a s3 API compatible server that is backed by a filer.
  59. By default, you can use any access key and secret key to access the S3 APIs.
  60. To enable credential based access, create a config.json file similar to this:
  61. {
  62. "identities": [
  63. {
  64. "name": "anonymous",
  65. "actions": [
  66. "Read"
  67. ]
  68. },
  69. {
  70. "name": "some_admin_user",
  71. "credentials": [
  72. {
  73. "accessKey": "some_access_key1",
  74. "secretKey": "some_secret_key1"
  75. }
  76. ],
  77. "actions": [
  78. "Admin",
  79. "Read",
  80. "List",
  81. "Tagging",
  82. "Write"
  83. ]
  84. },
  85. {
  86. "name": "some_read_only_user",
  87. "credentials": [
  88. {
  89. "accessKey": "some_access_key2",
  90. "secretKey": "some_secret_key2"
  91. }
  92. ],
  93. "actions": [
  94. "Read"
  95. ]
  96. },
  97. {
  98. "name": "some_normal_user",
  99. "credentials": [
  100. {
  101. "accessKey": "some_access_key3",
  102. "secretKey": "some_secret_key3"
  103. }
  104. ],
  105. "actions": [
  106. "Read",
  107. "List",
  108. "Tagging",
  109. "Write"
  110. ]
  111. },
  112. {
  113. "name": "user_limited_to_bucket1",
  114. "credentials": [
  115. {
  116. "accessKey": "some_access_key4",
  117. "secretKey": "some_secret_key4"
  118. }
  119. ],
  120. "actions": [
  121. "Read:bucket1",
  122. "List:bucket1",
  123. "Tagging:bucket1",
  124. "Write:bucket1"
  125. ]
  126. }
  127. ]
  128. }
  129. `,
  130. }
  131. func runS3(cmd *Command, args []string) bool {
  132. util.LoadConfiguration("security", false)
  133. go stats_collect.StartMetricsServer(*s3StandaloneOptions.bindIp, *s3StandaloneOptions.metricsHttpPort)
  134. return s3StandaloneOptions.startS3Server()
  135. }
  136. func (s3opt *S3Options) startS3Server() bool {
  137. filerAddress := pb.ServerAddress(*s3opt.filer)
  138. filerBucketsPath := "/buckets"
  139. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  140. // metrics read from the filer
  141. var metricsAddress string
  142. var metricsIntervalSec int
  143. for {
  144. err := pb.WithGrpcFilerClient(false, 0, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  145. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  146. if err != nil {
  147. return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
  148. }
  149. filerBucketsPath = resp.DirBuckets
  150. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
  151. glog.V(0).Infof("S3 read filer buckets dir: %s", filerBucketsPath)
  152. return nil
  153. })
  154. if err != nil {
  155. glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
  156. time.Sleep(time.Second)
  157. } else {
  158. glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
  159. break
  160. }
  161. }
  162. go stats_collect.LoopPushingMetric("s3", stats_collect.SourceName(uint32(*s3opt.port)), metricsAddress, metricsIntervalSec)
  163. router := mux.NewRouter().SkipClean(true)
  164. var localFilerSocket string
  165. if s3opt.localFilerSocket != nil {
  166. localFilerSocket = *s3opt.localFilerSocket
  167. }
  168. s3ApiServer, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
  169. Filer: filerAddress,
  170. Port: *s3opt.port,
  171. Config: *s3opt.config,
  172. DomainName: *s3opt.domainName,
  173. BucketsPath: filerBucketsPath,
  174. GrpcDialOption: grpcDialOption,
  175. AllowEmptyFolder: *s3opt.allowEmptyFolder,
  176. AllowDeleteBucketNotEmpty: *s3opt.allowDeleteBucketNotEmpty,
  177. LocalFilerSocket: localFilerSocket,
  178. DataCenter: *s3opt.dataCenter,
  179. })
  180. if s3ApiServer_err != nil {
  181. glog.Fatalf("S3 API Server startup error: %v", s3ApiServer_err)
  182. }
  183. httpS := &http.Server{Handler: router}
  184. if *s3opt.portGrpc == 0 {
  185. *s3opt.portGrpc = 10000 + *s3opt.port
  186. }
  187. if *s3opt.bindIp == "" {
  188. *s3opt.bindIp = "localhost"
  189. }
  190. listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port)
  191. s3ApiListener, s3ApiLocalListener, err := util.NewIpAndLocalListeners(*s3opt.bindIp, *s3opt.port, time.Duration(10)*time.Second)
  192. if err != nil {
  193. glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
  194. }
  195. if len(*s3opt.auditLogConfig) > 0 {
  196. s3err.InitAuditLog(*s3opt.auditLogConfig)
  197. if s3err.Logger != nil {
  198. defer s3err.Logger.Close()
  199. }
  200. }
  201. // starting grpc server
  202. grpcPort := *s3opt.portGrpc
  203. grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*s3opt.bindIp, grpcPort, 0)
  204. if err != nil {
  205. glog.Fatalf("s3 failed to listen on grpc port %d: %v", grpcPort, err)
  206. }
  207. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.s3"))
  208. s3_pb.RegisterSeaweedS3Server(grpcS, s3ApiServer)
  209. reflection.Register(grpcS)
  210. if grpcLocalL != nil {
  211. go grpcS.Serve(grpcLocalL)
  212. }
  213. go grpcS.Serve(grpcL)
  214. if *s3opt.tlsPrivateKey != "" {
  215. glog.V(0).Infof("Start Seaweed S3 API Server %s at https port %d", util.Version(), *s3opt.port)
  216. if s3ApiLocalListener != nil {
  217. go func() {
  218. if err = httpS.ServeTLS(s3ApiLocalListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
  219. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  220. }
  221. }()
  222. }
  223. if err = httpS.ServeTLS(s3ApiListener, *s3opt.tlsCertificate, *s3opt.tlsPrivateKey); err != nil {
  224. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  225. }
  226. } else {
  227. glog.V(0).Infof("Start Seaweed S3 API Server %s at http port %d", util.Version(), *s3opt.port)
  228. if s3ApiLocalListener != nil {
  229. go func() {
  230. if err = httpS.Serve(s3ApiLocalListener); err != nil {
  231. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  232. }
  233. }()
  234. }
  235. if err = httpS.Serve(s3ApiListener); err != nil {
  236. glog.Fatalf("S3 API Server Fail to serve: %v", err)
  237. }
  238. }
  239. return true
  240. }