filer_server.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "time"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/stats"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/chrislusf/seaweedfs/weed/filer2"
  14. _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
  15. _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv"
  22. "github.com/chrislusf/seaweedfs/weed/glog"
  23. "github.com/chrislusf/seaweedfs/weed/notification"
  24. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  25. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  26. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  28. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  29. "github.com/chrislusf/seaweedfs/weed/security"
  30. )
  31. type FilerOption struct {
  32. Masters []string
  33. Collection string
  34. DefaultReplication string
  35. DisableDirListing bool
  36. MaxMB int
  37. DirListingLimit int
  38. DataCenter string
  39. DefaultLevelDbDir string
  40. DisableHttp bool
  41. Port uint32
  42. recursiveDelete bool
  43. Cipher bool
  44. }
  45. type FilerServer struct {
  46. option *FilerOption
  47. secret security.SigningKey
  48. filer *filer2.Filer
  49. grpcDialOption grpc.DialOption
  50. }
  51. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
  52. fs = &FilerServer{
  53. option: option,
  54. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  55. }
  56. if len(option.Masters) == 0 {
  57. glog.Fatal("master list is required!")
  58. }
  59. fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000)
  60. fs.filer.Cipher = option.Cipher
  61. go fs.filer.KeepConnectedToMaster()
  62. v := util.GetViper()
  63. if !util.LoadConfiguration("filer", false) {
  64. v.Set("leveldb2.enabled", true)
  65. v.Set("leveldb2.dir", option.DefaultLevelDbDir)
  66. _, err := os.Stat(option.DefaultLevelDbDir)
  67. if os.IsNotExist(err) {
  68. os.MkdirAll(option.DefaultLevelDbDir, 0755)
  69. }
  70. }
  71. util.LoadConfiguration("notification", false)
  72. fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
  73. v.Set("filer.option.buckets_folder", "/buckets")
  74. v.Set("filer.option.queues_folder", "/queues")
  75. fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder")
  76. fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder")
  77. fs.filer.LoadConfiguration(v)
  78. notification.LoadConfiguration(v, "notification.")
  79. handleStaticResources(defaultMux)
  80. if !option.DisableHttp {
  81. defaultMux.HandleFunc("/", fs.filerHandler)
  82. }
  83. if defaultMux != readonlyMux {
  84. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  85. }
  86. fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
  87. maybeStartMetrics(fs, option)
  88. return fs, nil
  89. }
  90. func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
  91. isConnected := false
  92. var metricsAddress string
  93. var metricsIntervalSec int
  94. var readErr error
  95. for !isConnected {
  96. metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
  97. if readErr == nil {
  98. isConnected = true
  99. } else {
  100. time.Sleep(7 * time.Second)
  101. }
  102. }
  103. if metricsAddress == "" && metricsIntervalSec <= 0 {
  104. return
  105. }
  106. go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
  107. func() (addr string, intervalSeconds int) {
  108. return metricsAddress, metricsIntervalSec
  109. })
  110. }
  111. func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
  112. err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  113. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  114. if err != nil {
  115. return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
  116. }
  117. metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  118. return nil
  119. })
  120. return
  121. }