filer_server.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/filer"
  18. _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
  19. _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
  20. _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
  21. _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
  22. _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"
  23. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  24. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2"
  25. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3"
  26. _ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb"
  27. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql"
  28. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
  29. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
  30. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
  31. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
  32. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
  33. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
  34. _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite"
  35. _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb"
  36. "github.com/seaweedfs/seaweedfs/weed/glog"
  37. "github.com/seaweedfs/seaweedfs/weed/notification"
  38. _ "github.com/seaweedfs/seaweedfs/weed/notification/aws_sqs"
  39. _ "github.com/seaweedfs/seaweedfs/weed/notification/gocdk_pub_sub"
  40. _ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub"
  41. _ "github.com/seaweedfs/seaweedfs/weed/notification/kafka"
  42. _ "github.com/seaweedfs/seaweedfs/weed/notification/log"
  43. "github.com/seaweedfs/seaweedfs/weed/security"
  44. )
  45. type FilerOption struct {
  46. Masters map[string]pb.ServerAddress
  47. FilerGroup string
  48. Collection string
  49. DefaultReplication string
  50. DisableDirListing bool
  51. MaxMB int
  52. DirListingLimit int
  53. DataCenter string
  54. Rack string
  55. DataNode string
  56. DefaultLevelDbDir string
  57. DisableHttp bool
  58. Host pb.ServerAddress
  59. recursiveDelete bool
  60. Cipher bool
  61. SaveToFilerLimit int64
  62. ConcurrentUploadLimit int64
  63. ShowUIDirectoryDelete bool
  64. DownloadMaxBytesPs int64
  65. }
  66. type FilerServer struct {
  67. inFlightDataSize int64
  68. inFlightDataLimitCond *sync.Cond
  69. filer_pb.UnimplementedSeaweedFilerServer
  70. option *FilerOption
  71. secret security.SigningKey
  72. filer *filer.Filer
  73. filerGuard *security.Guard
  74. grpcDialOption grpc.DialOption
  75. // metrics read from the master
  76. metricsAddress string
  77. metricsIntervalSec int
  78. // notifying clients
  79. listenersLock sync.Mutex
  80. listenersCond *sync.Cond
  81. // track known metadata listeners
  82. knownListenersLock sync.Mutex
  83. knownListeners map[int32]int32
  84. }
  85. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  86. v := util.GetViper()
  87. signingKey := v.GetString("jwt.filer_signing.key")
  88. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  89. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  90. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  91. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  92. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  93. fs = &FilerServer{
  94. option: option,
  95. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  96. knownListeners: make(map[int32]int32),
  97. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  98. }
  99. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  100. if len(option.Masters) == 0 {
  101. glog.Fatal("master list is required!")
  102. }
  103. fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  104. fs.listenersCond.Broadcast()
  105. })
  106. fs.filer.Cipher = option.Cipher
  107. // we do not support IP whitelist right now
  108. fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  109. fs.checkWithMaster()
  110. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  111. go fs.filer.KeepMasterClientConnected()
  112. if !util.LoadConfiguration("filer", false) {
  113. v.SetDefault("leveldb2.enabled", true)
  114. v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
  115. _, err := os.Stat(option.DefaultLevelDbDir)
  116. if os.IsNotExist(err) {
  117. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  118. }
  119. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  120. } else {
  121. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  122. }
  123. util.LoadConfiguration("notification", false)
  124. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  125. v.SetDefault("filer.options.buckets_folder", "/buckets")
  126. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  127. // TODO deprecated, will be be removed after 2020-12-31
  128. // replaced by https://github.com/seaweedfs/seaweedfs/wiki/Path-Specific-Configuration
  129. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  130. isFresh := fs.filer.LoadConfiguration(v)
  131. notification.LoadConfiguration(v, "notification.")
  132. handleStaticResources(defaultMux)
  133. if !option.DisableHttp {
  134. defaultMux.HandleFunc("/", fs.filerHandler)
  135. }
  136. if defaultMux != readonlyMux {
  137. handleStaticResources(readonlyMux)
  138. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  139. }
  140. existingNodes := fs.filer.ListExistingPeerUpdates()
  141. startFromTime := time.Now().Add(-filer.LogFlushInterval)
  142. if isFresh {
  143. glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
  144. if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
  145. glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes)
  146. }
  147. }
  148. fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
  149. fs.filer.LoadFilerConf()
  150. fs.filer.LoadRemoteStorageConfAndMapping()
  151. grace.OnInterrupt(func() {
  152. fs.filer.Shutdown()
  153. })
  154. return fs, nil
  155. }
  156. func (fs *FilerServer) checkWithMaster() {
  157. isConnected := false
  158. for !isConnected {
  159. for _, master := range fs.option.Masters {
  160. readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  161. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  162. if err != nil {
  163. return fmt.Errorf("get master %s configuration: %v", master, err)
  164. }
  165. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  166. if fs.option.DefaultReplication == "" {
  167. fs.option.DefaultReplication = resp.DefaultReplication
  168. }
  169. return nil
  170. })
  171. if readErr == nil {
  172. isConnected = true
  173. } else {
  174. time.Sleep(7 * time.Second)
  175. }
  176. }
  177. }
  178. }