filer_server.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/util/grace"
  11. "github.com/chrislusf/seaweedfs/weed/operation"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  14. "github.com/chrislusf/seaweedfs/weed/stats"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer2"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
  26. "github.com/chrislusf/seaweedfs/weed/glog"
  27. "github.com/chrislusf/seaweedfs/weed/notification"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  29. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  30. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  31. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  32. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  33. "github.com/chrislusf/seaweedfs/weed/security"
  34. )
  35. type FilerOption struct {
  36. Masters []string
  37. Collection string
  38. DefaultReplication string
  39. DisableDirListing bool
  40. MaxMB int
  41. DirListingLimit int
  42. DataCenter string
  43. DefaultLevelDbDir string
  44. DisableHttp bool
  45. Host string
  46. Port uint32
  47. recursiveDelete bool
  48. Cipher bool
  49. Filers []string
  50. }
  51. type FilerServer struct {
  52. option *FilerOption
  53. secret security.SigningKey
  54. filer *filer2.Filer
  55. grpcDialOption grpc.DialOption
  56. // notifying clients
  57. listenersLock sync.Mutex
  58. listenersCond *sync.Cond
  59. brokers map[string]map[string]bool
  60. brokersLock sync.Mutex
  61. }
  62. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  63. fs = &FilerServer{
  64. option: option,
  65. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  66. brokers: make(map[string]map[string]bool),
  67. }
  68. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  69. if len(option.Masters) == 0 {
  70. glog.Fatal("master list is required!")
  71. }
  72. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
  73. fs.listenersCond.Broadcast()
  74. })
  75. fs.filer.Cipher = option.Cipher
  76. maybeStartMetrics(fs, option)
  77. go fs.filer.KeepConnectedToMaster()
  78. v := util.GetViper()
  79. if !util.LoadConfiguration("filer", false) {
  80. v.Set("leveldb2.enabled", true)
  81. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  82. _, err := os.Stat(option.DefaultLevelDbDir)
  83. if os.IsNotExist(err) {
  84. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  85. }
  86. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  87. }
  88. util.LoadConfiguration("notification", false)
  89. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  90. v.SetDefault("filer.options.buckets_folder", "/buckets")
  91. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  92. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  93. fs.filer.LoadConfiguration(v)
  94. notification.LoadConfiguration(v, "notification.")
  95. handleStaticResources(defaultMux)
  96. if !option.DisableHttp {
  97. defaultMux.HandleFunc("/", fs.filerHandler)
  98. }
  99. if defaultMux != readonlyMux {
  100. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  101. }
  102. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  103. fs.filer.LoadBuckets()
  104. grace.OnInterrupt(func() {
  105. fs.filer.Shutdown()
  106. })
  107. return fs, nil
  108. }
  109. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  110. for _, master := range option.Masters {
  111. _, err := pb.ParseFilerGrpcAddress(master)
  112. if err != nil {
  113. glog.Fatalf("invalid master address %s: %v", master, err)
  114. }
  115. }
  116. isConnected := false
  117. var metricsAddress string
  118. var metricsIntervalSec int
  119. var readErr error
  120. for !isConnected {
  121. for _, master := range option.Masters {
  122. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
  123. if readErr == nil {
  124. isConnected = true
  125. } else {
  126. time.Sleep(7 * time.Second)
  127. }
  128. }
  129. }
  130. if metricsAddress == "" && metricsIntervalSec <= 0 {
  131. return
  132. }
  133. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  134. func() (addr string, intervalSeconds int) {
  135. return metricsAddress, metricsIntervalSec
  136. })
  137. }
  138. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  139. err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  140. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  141. if err != nil {
  142. return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
  143. }
  144. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  145. return nil
  146. })
  147. return
  148. }