filer_server.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. "google.golang.org/grpc"
  13. "github.com/chrislusf/seaweedfs/weed/filer2"
  14. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  15. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv"
  23. "github.com/chrislusf/seaweedfs/weed/glog"
  24. "github.com/chrislusf/seaweedfs/weed/notification"
  25. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  26. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  30. "github.com/chrislusf/seaweedfs/weed/security"
  31. "github.com/spf13/viper"
  32. )
  33. type FilerOption struct {
  34. Masters []string
  35. Collection string
  36. DefaultReplication string
  37. RedirectOnRead bool
  38. DisableDirListing bool
  39. MaxMB int
  40. DirListingLimit int
  41. DataCenter string
  42. DefaultLevelDbDir string
  43. DisableHttp bool
  44. Port int
  45. }
  46. type FilerServer struct {
  47. option *FilerOption
  48. secret security.SigningKey
  49. filer *filer2.Filer
  50. grpcDialOption grpc.DialOption
  51. }
  52. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  53. fs = &FilerServer{
  54. option: option,
  55. grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
  56. }
  57. if len(option.Masters) == 0 {
  58. glog.Fatal("master list is required!")
  59. }
  60. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
  61. go fs.filer.KeepConnectedToMaster()
  62. v := viper.GetViper()
  63. if !util.LoadConfiguration("filer", false) {
  64. v.Set("leveldb2.enabled", true)
  65. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  66. _, err := os.Stat(option.DefaultLevelDbDir)
  67. if os.IsNotExist(err) {
  68. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  69. }
  70. }
  71. util.LoadConfiguration("notification", false)
  72. fs.filer.LoadConfiguration(v)
  73. notification.LoadConfiguration(v.Sub("notification"))
  74. handleStaticResources(defaultMux)
  75. if !option.DisableHttp {
  76. defaultMux.HandleFunc("/", fs.filerHandler)
  77. }
  78. if defaultMux != readonlyMux {
  79. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  80. }
  81. maybeStartMetrics(fs, option)
  82. return fs, nil
  83. }
  84. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  85. isConnected := false
  86. var metricsAddress string
  87. var metricsIntervalSec int
  88. var readErr error
  89. for !isConnected {
  90. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
  91. if readErr == nil {
  92. isConnected = true
  93. } else {
  94. time.Sleep(7 * time.Second)
  95. }
  96. }
  97. if metricsAddress == "" && metricsIntervalSec <= 0 {
  98. return
  99. }
  100. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  101. func() (addr string, intervalSeconds int) {
  102. return metricsAddress, metricsIntervalSec
  103. })
  104. }
  105. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  106. err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  107. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  108. if err != nil {
  109. return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
  110. }
  111. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  112. return nil
  113. })
  114. return
  115. }