filer_server.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "net/http"
  7. "os"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/stats"
  11. "google.golang.org/grpc"
  12. "github.com/chrislusf/seaweedfs/weed/util/grace"
  13. "github.com/chrislusf/seaweedfs/weed/operation"
  14. "github.com/chrislusf/seaweedfs/weed/pb"
  15. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. "github.com/chrislusf/seaweedfs/weed/filer"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  22. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  23. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  24. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  25. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  26. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  27. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  28. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  29. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
  30. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  31. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  32. _ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
  33. _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
  34. "github.com/chrislusf/seaweedfs/weed/glog"
  35. "github.com/chrislusf/seaweedfs/weed/notification"
  36. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  37. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  38. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  39. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  40. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  41. "github.com/chrislusf/seaweedfs/weed/security"
  42. )
  43. type FilerOption struct {
  44. Masters []pb.ServerAddress
  45. Collection string
  46. DefaultReplication string
  47. DisableDirListing bool
  48. MaxMB int
  49. DirListingLimit int
  50. DataCenter string
  51. Rack string
  52. DataNode string
  53. DefaultLevelDbDir string
  54. DisableHttp bool
  55. Host pb.ServerAddress
  56. recursiveDelete bool
  57. Cipher bool
  58. SaveToFilerLimit int64
  59. ConcurrentUploadLimit int64
  60. }
  61. type FilerServer struct {
  62. filer_pb.UnimplementedSeaweedFilerServer
  63. option *FilerOption
  64. secret security.SigningKey
  65. filer *filer.Filer
  66. filerGuard *security.Guard
  67. grpcDialOption grpc.DialOption
  68. // metrics read from the master
  69. metricsAddress string
  70. metricsIntervalSec int
  71. // notifying clients
  72. listenersLock sync.Mutex
  73. listenersCond *sync.Cond
  74. // track known metadata listeners
  75. knownListenersLock sync.Mutex
  76. knownListeners map[int32]struct{}
  77. brokers map[string]map[string]bool
  78. brokersLock sync.Mutex
  79. inFlightDataSize int64
  80. inFlightDataLimitCond *sync.Cond
  81. }
  82. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  83. v := util.GetViper()
  84. signingKey := v.GetString("jwt.filer_signing.key")
  85. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  86. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  87. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  88. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  89. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  90. fs = &FilerServer{
  91. option: option,
  92. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  93. knownListeners: make(map[int32]struct{}),
  94. brokers: make(map[string]map[string]bool),
  95. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  96. }
  97. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  98. if len(option.Masters) == 0 {
  99. glog.Fatal("master list is required!")
  100. }
  101. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  102. fs.listenersCond.Broadcast()
  103. })
  104. fs.filer.Cipher = option.Cipher
  105. // we do not support IP whitelist right now
  106. fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  107. fs.checkWithMaster()
  108. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  109. go fs.filer.KeepMasterClientConnected()
  110. if !util.LoadConfiguration("filer", false) {
  111. v.Set("leveldb2.enabled", true)
  112. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  113. _, err := os.Stat(option.DefaultLevelDbDir)
  114. if os.IsNotExist(err) {
  115. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  116. }
  117. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  118. } else {
  119. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  120. }
  121. util.LoadConfiguration("notification", false)
  122. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  123. v.SetDefault("filer.options.buckets_folder", "/buckets")
  124. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  125. // TODO deprecated, will be be removed after 2020-12-31
  126. // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
  127. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  128. fs.filer.LoadConfiguration(v)
  129. notification.LoadConfiguration(v, "notification.")
  130. handleStaticResources(defaultMux)
  131. if !option.DisableHttp {
  132. defaultMux.HandleFunc("/", fs.filerHandler)
  133. }
  134. if defaultMux != readonlyMux {
  135. handleStaticResources(readonlyMux)
  136. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  137. }
  138. fs.filer.AggregateFromPeers(option.Host)
  139. fs.filer.LoadBuckets()
  140. fs.filer.LoadFilerConf()
  141. fs.filer.LoadRemoteStorageConfAndMapping()
  142. grace.OnInterrupt(func() {
  143. fs.filer.Shutdown()
  144. })
  145. return fs, nil
  146. }
  147. func (fs *FilerServer) checkWithMaster() {
  148. isConnected := false
  149. for !isConnected {
  150. for _, master := range fs.option.Masters {
  151. readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  152. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  153. if err != nil {
  154. return fmt.Errorf("get master %s configuration: %v", master, err)
  155. }
  156. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  157. if fs.option.DefaultReplication == "" {
  158. fs.option.DefaultReplication = resp.DefaultReplication
  159. }
  160. return nil
  161. })
  162. if readErr == nil {
  163. isConnected = true
  164. } else {
  165. time.Sleep(7 * time.Second)
  166. }
  167. }
  168. }
  169. }