filer_server.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/stats"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  14. "github.com/seaweedfs/seaweedfs/weed/operation"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "github.com/seaweedfs/seaweedfs/weed/filer"
  20. _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
  21. _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
  22. _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
  23. _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
  24. _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"
  25. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
  26. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2"
  27. _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3"
  28. _ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb"
  29. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql"
  30. _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
  31. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
  32. _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
  33. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
  34. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
  35. _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
  36. _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite"
  37. _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb"
  38. "github.com/seaweedfs/seaweedfs/weed/glog"
  39. "github.com/seaweedfs/seaweedfs/weed/notification"
  40. _ "github.com/seaweedfs/seaweedfs/weed/notification/aws_sqs"
  41. _ "github.com/seaweedfs/seaweedfs/weed/notification/gocdk_pub_sub"
  42. _ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub"
  43. _ "github.com/seaweedfs/seaweedfs/weed/notification/kafka"
  44. _ "github.com/seaweedfs/seaweedfs/weed/notification/log"
  45. "github.com/seaweedfs/seaweedfs/weed/security"
  46. )
  47. type FilerOption struct {
  48. Masters *pb.ServerDiscovery
  49. FilerGroup string
  50. Collection string
  51. DefaultReplication string
  52. DisableDirListing bool
  53. MaxMB int
  54. DirListingLimit int
  55. DataCenter string
  56. Rack string
  57. DataNode string
  58. DefaultLevelDbDir string
  59. DisableHttp bool
  60. Host pb.ServerAddress
  61. recursiveDelete bool
  62. Cipher bool
  63. SaveToFilerLimit int64
  64. ConcurrentUploadLimit int64
  65. ShowUIDirectoryDelete bool
  66. DownloadMaxBytesPs int64
  67. DiskType string
  68. AllowedOrigins []string
  69. ExposeDirectoryData bool
  70. }
  71. type FilerServer struct {
  72. inFlightDataSize int64
  73. listenersWaits int64
  74. // notifying clients
  75. listenersLock sync.Mutex
  76. listenersCond *sync.Cond
  77. inFlightDataLimitCond *sync.Cond
  78. filer_pb.UnimplementedSeaweedFilerServer
  79. option *FilerOption
  80. secret security.SigningKey
  81. filer *filer.Filer
  82. filerGuard *security.Guard
  83. volumeGuard *security.Guard
  84. grpcDialOption grpc.DialOption
  85. // metrics read from the master
  86. metricsAddress string
  87. metricsIntervalSec int
  88. // track known metadata listeners
  89. knownListenersLock sync.Mutex
  90. knownListeners map[int32]int32
  91. }
  92. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  93. v := util.GetViper()
  94. signingKey := v.GetString("jwt.filer_signing.key")
  95. v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
  96. expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
  97. readSigningKey := v.GetString("jwt.filer_signing.read.key")
  98. v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
  99. readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
  100. volumeSigningKey := v.GetString("jwt.signing.key")
  101. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  102. volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  103. volumeReadSigningKey := v.GetString("jwt.signing.read.key")
  104. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  105. volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  106. v.SetDefault("cors.allowed_origins.values", "*")
  107. allowedOrigins := v.GetString("cors.allowed_origins.values")
  108. domains := strings.Split(allowedOrigins, ",")
  109. option.AllowedOrigins = domains
  110. v.SetDefault("filer.expose_directory_metadata.enabled", true)
  111. returnDirMetadata := v.GetBool("filer.expose_directory_metadata.enabled")
  112. option.ExposeDirectoryData = returnDirMetadata
  113. fs = &FilerServer{
  114. option: option,
  115. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  116. knownListeners: make(map[int32]int32),
  117. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  118. }
  119. fs.listenersCond = sync.NewCond(&fs.listenersLock)
  120. option.Masters.RefreshBySrvIfAvailable()
  121. if len(option.Masters.GetInstances()) == 0 {
  122. glog.Fatal("master list is required!")
  123. }
  124. v.SetDefault("filer.options.max_file_name_length", 255)
  125. maxFilenameLength := v.GetUint32("filer.options.max_file_name_length")
  126. fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() {
  127. if atomic.LoadInt64(&fs.listenersWaits) > 0 {
  128. fs.listenersCond.Broadcast()
  129. }
  130. })
  131. fs.filer.Cipher = option.Cipher
  132. // we do not support IP whitelist right now
  133. fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  134. fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec)
  135. fs.checkWithMaster()
  136. go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
  137. go fs.filer.KeepMasterClientConnected(context.Background())
  138. if !util.LoadConfiguration("filer", false) {
  139. v.SetDefault("leveldb2.enabled", true)
  140. v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
  141. _, err := os.Stat(option.DefaultLevelDbDir)
  142. if os.IsNotExist(err) {
  143. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  144. }
  145. glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
  146. } else {
  147. glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
  148. }
  149. util.LoadConfiguration("notification", false)
  150. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  151. v.SetDefault("filer.options.buckets_folder", "/buckets")
  152. fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
  153. // TODO deprecated, will be removed after 2020-12-31
  154. // replaced by https://github.com/seaweedfs/seaweedfs/wiki/Path-Specific-Configuration
  155. // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
  156. isFresh := fs.filer.LoadConfiguration(v)
  157. notification.LoadConfiguration(v, "notification.")
  158. handleStaticResources(defaultMux)
  159. if !option.DisableHttp {
  160. defaultMux.HandleFunc("/healthz", fs.filerHealthzHandler)
  161. defaultMux.HandleFunc("/", fs.filerHandler)
  162. }
  163. if defaultMux != readonlyMux {
  164. handleStaticResources(readonlyMux)
  165. readonlyMux.HandleFunc("/healthz", fs.filerHealthzHandler)
  166. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  167. }
  168. existingNodes := fs.filer.ListExistingPeerUpdates(context.Background())
  169. startFromTime := time.Now().Add(-filer.LogFlushInterval)
  170. if isFresh {
  171. glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
  172. if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil {
  173. glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
  174. }
  175. }
  176. fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
  177. fs.filer.LoadFilerConf()
  178. fs.filer.LoadRemoteStorageConfAndMapping()
  179. grace.OnInterrupt(func() {
  180. fs.filer.Shutdown()
  181. })
  182. fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
  183. return fs, nil
  184. }
  185. func (fs *FilerServer) checkWithMaster() {
  186. isConnected := false
  187. for !isConnected {
  188. fs.option.Masters.RefreshBySrvIfAvailable()
  189. for _, master := range fs.option.Masters.GetInstances() {
  190. readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  191. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  192. if err != nil {
  193. return fmt.Errorf("get master %s configuration: %v", master, err)
  194. }
  195. fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  196. return nil
  197. })
  198. if readErr == nil {
  199. isConnected = true
  200. } else {
  201. time.Sleep(7 * time.Second)
  202. }
  203. }
  204. }
  205. }