filer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package command
  2. import (
  3. "fmt"
  4. "net"
  5. "net/http"
  6. "os"
  7. "runtime"
  8. "sort"
  9. "strings"
  10. "time"
  11. "google.golang.org/grpc/reflection"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/security"
  17. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  18. stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. )
  21. var (
  22. f FilerOptions
  23. filerStartS3 *bool
  24. filerS3Options S3Options
  25. filerStartWebDav *bool
  26. filerWebDavOptions WebDavOption
  27. filerStartIam *bool
  28. filerIamOptions IamOptions
  29. )
  30. type FilerOptions struct {
  31. masters map[string]pb.ServerAddress
  32. mastersString *string
  33. ip *string
  34. bindIp *string
  35. port *int
  36. portGrpc *int
  37. publicPort *int
  38. filerGroup *string
  39. collection *string
  40. defaultReplicaPlacement *string
  41. disableDirListing *bool
  42. maxMB *int
  43. dirListingLimit *int
  44. dataCenter *string
  45. rack *string
  46. enableNotification *bool
  47. disableHttp *bool
  48. cipher *bool
  49. metricsHttpPort *int
  50. saveToFilerLimit *int
  51. defaultLevelDbDirectory *string
  52. concurrentUploadLimitMB *int
  53. debug *bool
  54. debugPort *int
  55. localSocket *string
  56. showUIDirectoryDelete *bool
  57. downloadMaxMBps *int
  58. }
  59. func init() {
  60. cmdFiler.Run = runFiler // break init cycle
  61. f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
  62. f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
  63. f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
  64. f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
  65. f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
  66. f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port")
  67. f.portGrpc = cmdFiler.Flag.Int("port.grpc", 0, "filer server grpc listen port")
  68. f.publicPort = cmdFiler.Flag.Int("port.readonly", 0, "readonly port opened to public")
  69. f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "", "default replication type. If not specified, use master setting.")
  70. f.disableDirListing = cmdFiler.Flag.Bool("disableDirListing", false, "turn off directory listing")
  71. f.maxMB = cmdFiler.Flag.Int("maxMB", 4, "split files larger than the limit")
  72. f.dirListingLimit = cmdFiler.Flag.Int("dirListLimit", 100000, "limit sub dir listing size")
  73. f.dataCenter = cmdFiler.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center")
  74. f.rack = cmdFiler.Flag.String("rack", "", "prefer to write to volumes in this rack")
  75. f.disableHttp = cmdFiler.Flag.Bool("disableHttp", false, "disable http request, only gRpc operations are allowed")
  76. f.cipher = cmdFiler.Flag.Bool("encryptVolumeData", false, "encrypt data on volume servers")
  77. f.metricsHttpPort = cmdFiler.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
  78. f.saveToFilerLimit = cmdFiler.Flag.Int("saveToFilerLimit", 0, "files smaller than this limit will be saved in filer store")
  79. f.defaultLevelDbDirectory = cmdFiler.Flag.String("defaultStoreDir", ".", "if filer.toml is empty, use an embedded filer store in the directory")
  80. f.concurrentUploadLimitMB = cmdFiler.Flag.Int("concurrentUploadLimitMB", 128, "limit total concurrent upload size")
  81. f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2")
  82. f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging")
  83. f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock")
  84. f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button")
  85. f.downloadMaxMBps = cmdFiler.Flag.Int("downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
  86. // start s3 on filer
  87. filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
  88. filerS3Options.port = cmdFiler.Flag.Int("s3.port", 8333, "s3 server http listen port")
  89. filerS3Options.portGrpc = cmdFiler.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port")
  90. filerS3Options.domainName = cmdFiler.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
  91. filerS3Options.dataCenter = cmdFiler.Flag.String("s3.dataCenter", "", "prefer to read and write to volumes in this data center")
  92. filerS3Options.tlsPrivateKey = cmdFiler.Flag.String("s3.key.file", "", "path to the TLS private key file")
  93. filerS3Options.tlsCertificate = cmdFiler.Flag.String("s3.cert.file", "", "path to the TLS certificate file")
  94. filerS3Options.config = cmdFiler.Flag.String("s3.config", "", "path to the config file")
  95. filerS3Options.auditLogConfig = cmdFiler.Flag.String("s3.auditLogConfig", "", "path to the audit log config file")
  96. filerS3Options.allowEmptyFolder = cmdFiler.Flag.Bool("s3.allowEmptyFolder", true, "allow empty folders")
  97. filerS3Options.allowDeleteBucketNotEmpty = cmdFiler.Flag.Bool("s3.allowDeleteBucketNotEmpty", true, "allow recursive deleting all entries along with bucket")
  98. // start webdav on filer
  99. filerStartWebDav = cmdFiler.Flag.Bool("webdav", false, "whether to start webdav gateway")
  100. filerWebDavOptions.port = cmdFiler.Flag.Int("webdav.port", 7333, "webdav server http listen port")
  101. filerWebDavOptions.collection = cmdFiler.Flag.String("webdav.collection", "", "collection to create the files")
  102. filerWebDavOptions.replication = cmdFiler.Flag.String("webdav.replication", "", "replication to create the files")
  103. filerWebDavOptions.disk = cmdFiler.Flag.String("webdav.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  104. filerWebDavOptions.tlsPrivateKey = cmdFiler.Flag.String("webdav.key.file", "", "path to the TLS private key file")
  105. filerWebDavOptions.tlsCertificate = cmdFiler.Flag.String("webdav.cert.file", "", "path to the TLS certificate file")
  106. filerWebDavOptions.cacheDir = cmdFiler.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks")
  107. filerWebDavOptions.cacheSizeMB = cmdFiler.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB")
  108. filerWebDavOptions.filerRootPath = cmdFiler.Flag.String("webdav.filer.path", "/", "use this remote path from filer server")
  109. // start iam on filer
  110. filerStartIam = cmdFiler.Flag.Bool("iam", false, "whether to start IAM service")
  111. filerIamOptions.ip = cmdFiler.Flag.String("iam.ip", *f.ip, "iam server http listen ip address")
  112. filerIamOptions.port = cmdFiler.Flag.Int("iam.port", 8111, "iam server http listen port")
  113. }
  114. func filerLongDesc() string {
  115. desc := `start a file server which accepts REST operation for any files.
  116. //create or overwrite the file, the directories /path/to will be automatically created
  117. POST /path/to/file
  118. //get the file content
  119. GET /path/to/file
  120. //create or overwrite the file, the filename in the multipart request will be used
  121. POST /path/to/
  122. //return a json format subdirectory and files listing
  123. GET /path/to/
  124. The configuration file "filer.toml" is read from ".", "$HOME/.seaweedfs/", "/usr/local/etc/seaweedfs/", or "/etc/seaweedfs/", in that order.
  125. If the "filer.toml" is not found, an embedded filer store will be created under "-defaultStoreDir".
  126. The example filer.toml configuration file can be generated by "weed scaffold -config=filer"
  127. Supported Filer Stores:
  128. `
  129. storeNames := make([]string, len(filer.Stores))
  130. for i, store := range filer.Stores {
  131. storeNames[i] = "\t" + store.GetName()
  132. }
  133. sort.Strings(storeNames)
  134. storeList := strings.Join(storeNames, "\n")
  135. return desc + storeList
  136. }
  137. var cmdFiler = &Command{
  138. UsageLine: "filer -port=8888 -master=<ip:port>[,<ip:port>]*",
  139. Short: "start a file server that points to a master server, or a list of master servers",
  140. Long: filerLongDesc(),
  141. }
  142. func runFiler(cmd *Command, args []string) bool {
  143. if *f.debug {
  144. go http.ListenAndServe(fmt.Sprintf(":%d", *f.debugPort), nil)
  145. }
  146. util.LoadConfiguration("security", false)
  147. go stats_collect.StartMetricsServer(*f.bindIp, *f.metricsHttpPort)
  148. filerAddress := util.JoinHostPort(*f.ip, *f.port)
  149. startDelay := time.Duration(2)
  150. if *filerStartS3 {
  151. filerS3Options.filer = &filerAddress
  152. filerS3Options.bindIp = f.bindIp
  153. filerS3Options.localFilerSocket = f.localSocket
  154. if *f.dataCenter != "" && *filerS3Options.dataCenter == "" {
  155. filerS3Options.dataCenter = f.dataCenter
  156. }
  157. go func(delay time.Duration) {
  158. time.Sleep(delay * time.Second)
  159. filerS3Options.startS3Server()
  160. }(startDelay)
  161. startDelay++
  162. }
  163. if *filerStartWebDav {
  164. filerWebDavOptions.filer = &filerAddress
  165. go func(delay time.Duration) {
  166. time.Sleep(delay * time.Second)
  167. filerWebDavOptions.startWebDav()
  168. }(startDelay)
  169. startDelay++
  170. }
  171. if *filerStartIam {
  172. filerIamOptions.filer = &filerAddress
  173. filerIamOptions.masters = f.mastersString
  174. go func(delay time.Duration) {
  175. time.Sleep(delay * time.Second)
  176. filerIamOptions.startIamServer()
  177. }(startDelay)
  178. }
  179. f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
  180. f.startFiler()
  181. return true
  182. }
  183. func (fo *FilerOptions) startFiler() {
  184. defaultMux := http.NewServeMux()
  185. publicVolumeMux := defaultMux
  186. if *fo.publicPort != 0 {
  187. publicVolumeMux = http.NewServeMux()
  188. }
  189. if *fo.portGrpc == 0 {
  190. *fo.portGrpc = 10000 + *fo.port
  191. }
  192. if *fo.bindIp == "" {
  193. *fo.bindIp = *fo.ip
  194. }
  195. defaultLevelDbDirectory := util.ResolvePath(*fo.defaultLevelDbDirectory + "/filerldb2")
  196. filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc)
  197. fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
  198. Masters: fo.masters,
  199. FilerGroup: *fo.filerGroup,
  200. Collection: *fo.collection,
  201. DefaultReplication: *fo.defaultReplicaPlacement,
  202. DisableDirListing: *fo.disableDirListing,
  203. MaxMB: *fo.maxMB,
  204. DirListingLimit: *fo.dirListingLimit,
  205. DataCenter: *fo.dataCenter,
  206. Rack: *fo.rack,
  207. DefaultLevelDbDir: defaultLevelDbDirectory,
  208. DisableHttp: *fo.disableHttp,
  209. Host: filerAddress,
  210. Cipher: *fo.cipher,
  211. SaveToFilerLimit: int64(*fo.saveToFilerLimit),
  212. ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
  213. ShowUIDirectoryDelete: *fo.showUIDirectoryDelete,
  214. DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
  215. })
  216. if nfs_err != nil {
  217. glog.Fatalf("Filer startup error: %v", nfs_err)
  218. }
  219. if *fo.publicPort != 0 {
  220. publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort)
  221. glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
  222. publicListener, localPublicListener, e := util.NewIpAndLocalListeners(*fo.bindIp, *fo.publicPort, 0)
  223. if e != nil {
  224. glog.Fatalf("Filer server public listener error on port %d:%v", *fo.publicPort, e)
  225. }
  226. go func() {
  227. if e := http.Serve(publicListener, publicVolumeMux); e != nil {
  228. glog.Fatalf("Volume server fail to serve public: %v", e)
  229. }
  230. }()
  231. if localPublicListener != nil {
  232. go func() {
  233. if e := http.Serve(localPublicListener, publicVolumeMux); e != nil {
  234. glog.Errorf("Volume server fail to serve public: %v", e)
  235. }
  236. }()
  237. }
  238. }
  239. glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
  240. filerListener, filerLocalListener, e := util.NewIpAndLocalListeners(
  241. *fo.bindIp, *fo.port,
  242. time.Duration(10)*time.Second,
  243. )
  244. if e != nil {
  245. glog.Fatalf("Filer listener error: %v", e)
  246. }
  247. // starting grpc server
  248. grpcPort := *fo.portGrpc
  249. grpcL, grpcLocalL, err := util.NewIpAndLocalListeners(*fo.bindIp, grpcPort, 0)
  250. if err != nil {
  251. glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
  252. }
  253. grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.filer"))
  254. filer_pb.RegisterSeaweedFilerServer(grpcS, fs)
  255. reflection.Register(grpcS)
  256. if grpcLocalL != nil {
  257. go grpcS.Serve(grpcLocalL)
  258. }
  259. go grpcS.Serve(grpcL)
  260. httpS := &http.Server{Handler: defaultMux}
  261. if runtime.GOOS != "windows" {
  262. localSocket := *fo.localSocket
  263. if localSocket == "" {
  264. localSocket = fmt.Sprintf("/tmp/seaweedfs-filer-%d.sock", *fo.port)
  265. }
  266. if err := os.Remove(localSocket); err != nil && !os.IsNotExist(err) {
  267. glog.Fatalf("Failed to remove %s, error: %s", localSocket, err.Error())
  268. }
  269. go func() {
  270. // start on local unix socket
  271. filerSocketListener, err := net.Listen("unix", localSocket)
  272. if err != nil {
  273. glog.Fatalf("Failed to listen on %s: %v", localSocket, err)
  274. }
  275. httpS.Serve(filerSocketListener)
  276. }()
  277. }
  278. if filerLocalListener != nil {
  279. go func() {
  280. if err := httpS.Serve(filerLocalListener); err != nil {
  281. glog.Errorf("Filer Fail to serve: %v", e)
  282. }
  283. }()
  284. }
  285. if err := httpS.Serve(filerListener); err != nil {
  286. glog.Fatalf("Filer Fail to serve: %v", e)
  287. }
  288. }