filer_server.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. "github.com/chrislusf/seaweedfs/weed/filer"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
  29. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  30. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  31. _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
  32. "github.com/chrislusf/seaweedfs/weed/glog"
  33. "github.com/chrislusf/seaweedfs/weed/notification"
  34. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  35. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  36. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  37. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  38. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  39. "github.com/chrislusf/seaweedfs/weed/security"
  40. )
  41. type FilerOption struct {
  42. Masters []string
  43. Collection string
  44. DefaultReplication string
  45. DisableDirListing bool
  46. MaxMB int
  47. DirListingLimit int
  48. DataCenter string
  49. Rack string
  50. DefaultLevelDbDir string
  51. DisableHttp bool
  52. Host string
  53. Port uint32
  54. recursiveDelete bool
  55. Cipher bool
  56. SaveToFilerLimit int64
  57. Filers []string
  58. ConcurrentUploadLimit int64
  59. }
  60. type FilerServer struct {
  61. option *FilerOption
  62. secret security.SigningKey
  63. filer *filer.Filer
  64. grpcDialOption grpc.DialOption
  65. // metrics read from the master
  66. metricsAddress string
  67. metricsIntervalSec int
  68. // notifying clients
  69. listenersLock sync.Mutex
  70. listenersCond *sync.Cond
  71. brokers map[string]map[string]bool
  72. brokersLock sync.Mutex
  73. inFlightDataSize int64
  74. inFlightDataLimitCond *sync.Cond
  75. }
  76. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  77. fs = &FilerServer{
  78. option: option,
  79. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  80. brokers: make(map[string]map[string]bool),
  81. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  82. }
  83. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  84. if len(option.Masters) == 0 {
  85. glog.Fatal("master list is required!")
  86. }
  87. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  88. fs.listenersCond.Broadcast()
  89. })
  90. fs.filer.Cipher = option.Cipher
  91. fs.checkWithMaster()
  92. go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
  93. go fs.filer.KeepConnectedToMaster()
  94. v := util.GetViper()
  95. if !util.LoadConfiguration("filer", false) {
  96. v.Set("leveldb2.enabled", true)
  97. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  98. _, err := os.Stat(option.DefaultLevelDbDir)
  99. if os.IsNotExist(err) {
  100. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  101. }
  102. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  103. } else {
  104. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  105. }
  106. util.LoadConfiguration("notification", false)
  107. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  108. v.SetDefault("filer.options.buckets_folder", "/buckets")
  109. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  110. // TODO deprecated, will be be removed after 2020-12-31
  111. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  112. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  113. fs.filer.LoadConfiguration(v)
  114. notification.LoadConfiguration(v, "notification.")
  115. handleStaticResources(defaultMux)
  116. if !option.DisableHttp {
  117. defaultMux.HandleFunc("/", fs.filerHandler)
  118. }
  119. if defaultMux != readonlyMux {
  120. handleStaticResources(readonlyMux)
  121. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  122. }
  123. fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
  124. fs.filer.LoadBuckets()
  125. fs.filer.LoadFilerConf()
  126. fs.filer.LoadRemoteStorageConfAndMapping()
  127. grace.OnInterrupt(func() {
  128. fs.filer.Shutdown()
  129. })
  130. return fs, nil
  131. }
  132. func (fs *FilerServer) checkWithMaster() {
  133. for _, master := range fs.option.Masters {
  134. _, err := pb.ParseServerToGrpcAddress(master)
  135. if err != nil {
  136. glog.Fatalf("invalid master address %s: %v", master, err)
  137. }
  138. }
  139. isConnected := false
  140. for !isConnected {
  141. for _, master := range fs.option.Masters {
  142. readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  143. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  144. if err != nil {
  145. return fmt.Errorf("get master %s configuration: %v", master, err)
  146. }
  147. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  148. if fs.option.DefaultReplication == "" {
  149. fs.option.DefaultReplication = resp.DefaultReplication
  150. }
  151. return nil
  152. })
  153. if readErr == nil {
  154. isConnected = true
  155. } else {
  156. time.Sleep(7 * time.Second)
  157. }
  158. }
  159. }
  160. }