filer_server.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
  29. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  30. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  31. "github.com/chrislusf/seaweedfs/weed/glog"
  32. "github.com/chrislusf/seaweedfs/weed/notification"
  33. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  34. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  35. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  36. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  37. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  38. "github.com/chrislusf/seaweedfs/weed/security"
  39. )
  40. type FilerOption struct {
  41. Masters []string
  42. Collection string
  43. DefaultReplication string
  44. DisableDirListing bool
  45. MaxMB int
  46. DirListingLimit int
  47. DataCenter string
  48. Rack string
  49. DefaultLevelDbDir string
  50. DisableHttp bool
  51. Host string
  52. Port uint32
  53. recursiveDelete bool
  54. Cipher bool
  55. SaveToFilerLimit int
  56. Filers []string
  57. }
  58. type FilerServer struct {
  59. option *FilerOption
  60. secret security.SigningKey
  61. filer *filer.Filer
  62. grpcDialOption grpc.DialOption
  63. // metrics read from the master
  64. metricsAddress string
  65. metricsIntervalSec int
  66. // notifying clients
  67. listenersLock sync.Mutex
  68. listenersCond *sync.Cond
  69. brokers map[string]map[string]bool
  70. brokersLock sync.Mutex
  71. }
  72. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  73. fs = &FilerServer{
  74. option: option,
  75. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  76. brokers: make(map[string]map[string]bool),
  77. }
  78. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  79. if len(option.Masters) == 0 {
  80. glog.Fatal("master list is required!")
  81. }
  82. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  83. fs.listenersCond.Broadcast()
  84. })
  85. fs.filer.Cipher = option.Cipher
  86. fs.checkWithMaster()
  87. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  88. go fs.filer.KeepConnectedToMaster()
  89. v := util.GetViper()
  90. if !util.LoadConfiguration("filer", false) {
  91. v.Set("leveldb2.enabled", true)
  92. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  93. _, err := os.Stat(option.DefaultLevelDbDir)
  94. if os.IsNotExist(err) {
  95. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  96. }
  97. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  98. } else {
  99. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  100. }
  101. util.LoadConfiguration("notification", false)
  102. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  103. v.SetDefault("filer.options.buckets_folder", "/buckets")
  104. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  105. // TODO deprecated, will be be removed after 2020-12-31
  106. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  107. fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  108. fs.filer.LoadConfiguration(v)
  109. notification.LoadConfiguration(v, "notification.")
  110. handleStaticResources(defaultMux)
  111. if !option.DisableHttp {
  112. defaultMux.HandleFunc("/", fs.filerHandler)
  113. }
  114. if defaultMux != readonlyMux {
  115. handleStaticResources(readonlyMux)
  116. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  117. }
  118. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  119. fs.filer.LoadBuckets()
  120. fs.filer.LoadFilerConf()
  121. grace.OnInterrupt(func() {
  122. fs.filer.Shutdown()
  123. })
  124. return fs, nil
  125. }
  126. func (fs *FilerServer) checkWithMaster() {
  127. for _, master := range fs.option.Masters {
  128. _, err := pb.ParseFilerGrpcAddress(master)
  129. if err != nil {
  130. glog.Fatalf("invalid master address %s: %v", master, err)
  131. }
  132. }
  133. isConnected := false
  134. for !isConnected {
  135. for _, master := range fs.option.Masters {
  136. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  137. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  138. if err != nil {
  139. return fmt.Errorf("get master %s configuration: %v", master, err)
  140. }
  141. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  142. if fs.option.DefaultReplication == "" {
  143. fs.option.DefaultReplication = resp.DefaultReplication
  144. }
  145. return nil
  146. })
  147. if readErr == nil {
  148. isConnected = true
  149. } else {
  150. time.Sleep(7 * time.Second)
  151. }
  152. }
  153. }
  154. }