filer_server.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/stats"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/filer"
  18. _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
  19. _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
  20. _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
  21. _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
  22. _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"
  23. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  24. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2"
  25. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3"
  26. _ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb"
  27. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql"
  28. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
  29. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
  30. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
  31. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
  32. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
  33. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
  34. _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite"
  35. _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb"
  36. "github.com/seaweedfs/seaweedfs/weed/glog"
  37. "github.com/seaweedfs/seaweedfs/weed/notification"
  38. _ "github.com/seaweedfs/seaweedfs/weed/notification/aws_sqs"
  39. _ "github.com/seaweedfs/seaweedfs/weed/notification/gocdk_pub_sub"
  40. _ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub"
  41. _ "github.com/seaweedfs/seaweedfs/weed/notification/kafka"
  42. _ "github.com/seaweedfs/seaweedfs/weed/notification/log"
  43. "github.com/seaweedfs/seaweedfs/weed/security"
  44. )
  45. type FilerOption struct {
  46. Masters *pb.ServerDiscovery
  47. FilerGroup string
  48. Collection string
  49. DefaultReplication string
  50. DisableDirListing bool
  51. MaxMB int
  52. DirListingLimit int
  53. DataCenter string
  54. Rack string
  55. DataNode string
  56. DefaultLevelDbDir string
  57. DisableHttp bool
  58. Host pb.ServerAddress
  59. recursiveDelete bool
  60. Cipher bool
  61. SaveToFilerLimit int64
  62. ConcurrentUploadLimit int64
  63. ShowUIDirectoryDelete bool
  64. DownloadMaxBytesPs int64
  65. DiskType string
  66. }
  67. type FilerServer struct {
  68. inFlightDataSize int64
  69. inFlightDataLimitCond *sync.Cond
  70. filer_pb.UnimplementedSeaweedFilerServer
  71. option *FilerOption
  72. secret security.SigningKey
  73. filer *filer.Filer
  74. filerGuard *security.Guard
  75. grpcDialOption grpc.DialOption
  76. // metrics read from the master
  77. metricsAddress string
  78. metricsIntervalSec int
  79. // notifying clients
  80. listenersLock sync.Mutex
  81. listenersCond *sync.Cond
  82. // track known metadata listeners
  83. knownListenersLock sync.Mutex
  84. knownListeners map[int32]int32
  85. }
  86. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  87. v := util.GetViper()
  88. signingKey := v.GetString("jwt.filer_signing.key")
  89. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  90. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  91. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  92. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  93. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  94. fs = &FilerServer{
  95. option: option,
  96. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  97. knownListeners: make(map[int32]int32),
  98. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  99. }
  100. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  101. option.Masters.RefreshBySrvIfAvailable()
  102. if len(option.Masters.GetInstances()) == 0 {
  103. glog.Fatal("master list is required!")
  104. }
  105. fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
  106. fs.listenersCond.Broadcast()
  107. })
  108. fs.filer.Cipher = option.Cipher
  109. // we do not support IP whitelist right now
  110. fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  111. fs.checkWithMaster()
  112. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  113. go fs.filer.KeepMasterClientConnected()
  114. if !util.LoadConfiguration("filer", false) {
  115. v.SetDefault("leveldb2.enabled", true)
  116. v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
  117. _, err := os.Stat(option.DefaultLevelDbDir)
  118. if os.IsNotExist(err) {
  119. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  120. }
  121. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  122. } else {
  123. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  124. }
  125. util.LoadConfiguration("notification", false)
  126. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  127. v.SetDefault("filer.options.buckets_folder", "/buckets")
  128. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  129. // TODO deprecated, will be be removed after 2020-12-31
  130. // replaced by https://github.com/seaweedfs/seaweedfs/wiki/Path-Specific-Configuration
  131. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  132. isFresh := fs.filer.LoadConfiguration(v)
  133. notification.LoadConfiguration(v, "notification.")
  134. handleStaticResources(defaultMux)
  135. if !option.DisableHttp {
  136. defaultMux.HandleFunc("/", fs.filerHandler)
  137. }
  138. if defaultMux != readonlyMux {
  139. handleStaticResources(readonlyMux)
  140. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  141. }
  142. existingNodes := fs.filer.ListExistingPeerUpdates()
  143. startFromTime := time.Now().Add(-filer.LogFlushInterval)
  144. if isFresh {
  145. glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
  146. if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
  147. glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes)
  148. }
  149. }
  150. fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
  151. fs.filer.LoadFilerConf()
  152. fs.filer.LoadRemoteStorageConfAndMapping()
  153. grace.OnInterrupt(func() {
  154. fs.filer.Shutdown()
  155. })
  156. fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
  157. return fs, nil
  158. }
  159. func (fs *FilerServer) checkWithMaster() {
  160. isConnected := false
  161. for !isConnected {
  162. fs.option.Masters.RefreshBySrvIfAvailable()
  163. for _, master := range fs.option.Masters.GetInstances() {
  164. readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  165. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  166. if err != nil {
  167. return fmt.Errorf("get master %s configuration: %v", master, err)
  168. }
  169. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  170. return nil
  171. })
  172. if readErr == nil {
  173. isConnected = true
  174. } else {
  175. time.Sleep(7 * time.Second)
  176. }
  177. }
  178. }
  179. }