filer_server.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util"
  18. "github.com/seaweedfs/seaweedfs/weed/filer"
  19. _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
  20. _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
  21. _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
  22. _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
  23. _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"
  24. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  25. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2"
  26. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3"
  27. _ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb"
  28. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql"
  29. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
  30. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
  31. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
  32. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
  33. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
  34. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
  35. _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite"
  36. _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb"
  37. "github.com/seaweedfs/seaweedfs/weed/glog"
  38. "github.com/seaweedfs/seaweedfs/weed/notification"
  39. _ "github.com/seaweedfs/seaweedfs/weed/notification/aws_sqs"
  40. _ "github.com/seaweedfs/seaweedfs/weed/notification/gocdk_pub_sub"
  41. _ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub"
  42. _ "github.com/seaweedfs/seaweedfs/weed/notification/kafka"
  43. _ "github.com/seaweedfs/seaweedfs/weed/notification/log"
  44. "github.com/seaweedfs/seaweedfs/weed/security"
  45. )
  46. type FilerOption struct {
  47. Masters *pb.ServerDiscovery
  48. FilerGroup string
  49. Collection string
  50. DefaultReplication string
  51. DisableDirListing bool
  52. MaxMB int
  53. DirListingLimit int
  54. DataCenter string
  55. Rack string
  56. DataNode string
  57. DefaultLevelDbDir string
  58. DisableHttp bool
  59. Host pb.ServerAddress
  60. recursiveDelete bool
  61. Cipher bool
  62. SaveToFilerLimit int64
  63. ConcurrentUploadLimit int64
  64. ShowUIDirectoryDelete bool
  65. DownloadMaxBytesPs int64
  66. DiskType string
  67. AllowedOrigins []string
  68. }
  69. type FilerServer struct {
  70. inFlightDataSize int64
  71. inFlightDataLimitCond *sync.Cond
  72. filer_pb.UnimplementedSeaweedFilerServer
  73. option *FilerOption
  74. secret security.SigningKey
  75. filer *filer.Filer
  76. filerGuard *security.Guard
  77. grpcDialOption grpc.DialOption
  78. // metrics read from the master
  79. metricsAddress string
  80. metricsIntervalSec int
  81. // notifying clients
  82. listenersLock sync.Mutex
  83. listenersCond *sync.Cond
  84. // track known metadata listeners
  85. knownListenersLock sync.Mutex
  86. knownListeners map[int32]int32
  87. }
  88. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  89. v := util.GetViper()
  90. signingKey := v.GetString("jwt.filer_signing.key")
  91. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  92. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  93. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  94. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  95. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  96. v.SetDefault("cors.allowed_origins.values", "*")
  97. if (option.AllowedOrigins == nil) || (len(option.AllowedOrigins) == 0) {
  98. allowedOrigins := v.GetString("cors.allowed_origins.values")
  99. domains := strings.Split(allowedOrigins, ",")
  100. option.AllowedOrigins = domains
  101. }
  102. fs = &FilerServer{
  103. option: option,
  104. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  105. knownListeners: make(map[int32]int32),
  106. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  107. }
  108. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  109. option.Masters.RefreshBySrvIfAvailable()
  110. if len(option.Masters.GetInstances()) == 0 {
  111. glog.Fatal("master list is required!")
  112. }
  113. v.SetDefault("filer.options.max_file_name_length", 255)
  114. maxFilenameLength := v.GetUint32("filer.options.max_file_name_length")
  115. fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() {
  116. fs.listenersCond.Broadcast()
  117. })
  118. fs.filer.Cipher = option.Cipher
  119. // we do not support IP whitelist right now
  120. fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  121. fs.checkWithMaster()
  122. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  123. go fs.filer.KeepMasterClientConnected()
  124. if !util.LoadConfiguration("filer", false) {
  125. v.SetDefault("leveldb2.enabled", true)
  126. v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
  127. _, err := os.Stat(option.DefaultLevelDbDir)
  128. if os.IsNotExist(err) {
  129. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  130. }
  131. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  132. } else {
  133. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  134. }
  135. util.LoadConfiguration("notification", false)
  136. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  137. v.SetDefault("filer.options.buckets_folder", "/buckets")
  138. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  139. // TODO deprecated, will be be removed after 2020-12-31
  140. // replaced by https://github.com/seaweedfs/seaweedfs/wiki/Path-Specific-Configuration
  141. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  142. isFresh := fs.filer.LoadConfiguration(v)
  143. notification.LoadConfiguration(v, "notification.")
  144. handleStaticResources(defaultMux)
  145. if !option.DisableHttp {
  146. defaultMux.HandleFunc("/healthz", fs.filerHealthzHandler)
  147. defaultMux.HandleFunc("/", fs.filerHandler)
  148. }
  149. if defaultMux != readonlyMux {
  150. handleStaticResources(readonlyMux)
  151. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  152. }
  153. existingNodes := fs.filer.ListExistingPeerUpdates()
  154. startFromTime := time.Now().Add(-filer.LogFlushInterval)
  155. if isFresh {
  156. glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
  157. if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
  158. glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
  159. }
  160. }
  161. fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
  162. fs.filer.LoadFilerConf()
  163. fs.filer.LoadRemoteStorageConfAndMapping()
  164. grace.OnInterrupt(func() {
  165. fs.filer.Shutdown()
  166. })
  167. fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
  168. return fs, nil
  169. }
  170. func (fs *FilerServer) checkWithMaster() {
  171. isConnected := false
  172. for !isConnected {
  173. fs.option.Masters.RefreshBySrvIfAvailable()
  174. for _, master := range fs.option.Masters.GetInstances() {
  175. readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  176. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  177. if err != nil {
  178. return fmt.Errorf("get master %s configuration: %v", master, err)
  179. }
  180. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  181. return nil
  182. })
  183. if readErr == nil {
  184. isConnected = true
  185. } else {
  186. time.Sleep(7 * time.Second)
  187. }
  188. }
  189. }
  190. }