filer_server.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package weed_server
  2. import (
  3. "encoding/json"
  4. "math/rand"
  5. "net/http"
  6. "os"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/filer/cassandra_store"
  12. "github.com/chrislusf/seaweedfs/weed/filer/embedded_filer"
  13. "github.com/chrislusf/seaweedfs/weed/filer/flat_namespace"
  14. "github.com/chrislusf/seaweedfs/weed/filer/mysql_store"
  15. "github.com/chrislusf/seaweedfs/weed/filer/postgres_store"
  16. "github.com/chrislusf/seaweedfs/weed/filer/redis_store"
  17. "github.com/chrislusf/seaweedfs/weed/glog"
  18. "github.com/chrislusf/seaweedfs/weed/security"
  19. "github.com/chrislusf/seaweedfs/weed/storage"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. )
  22. type filerConf struct {
  23. MysqlConf []mysql_store.MySqlConf `json:"mysql"`
  24. mysql_store.ShardingConf
  25. PostgresConf *postgres_store.PostgresConf `json:"postgres"`
  26. }
  27. func parseConfFile(confPath string) (*filerConf, error) {
  28. var setting filerConf
  29. configFile, err := os.Open(confPath)
  30. defer configFile.Close()
  31. if err != nil {
  32. return nil, err
  33. }
  34. jsonParser := json.NewDecoder(configFile)
  35. if err = jsonParser.Decode(&setting); err != nil {
  36. return nil, err
  37. }
  38. return &setting, nil
  39. }
  40. type FilerServer struct {
  41. port string
  42. master string
  43. mnLock sync.RWMutex
  44. collection string
  45. defaultReplication string
  46. redirectOnRead bool
  47. disableDirListing bool
  48. secret security.Secret
  49. filer filer.Filer
  50. maxMB int
  51. masterNodes *storage.MasterNodes
  52. }
  53. func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, ip string, port int, master string, dir string, collection string,
  54. replication string, redirectOnRead bool, disableDirListing bool,
  55. confFile string,
  56. maxMB int,
  57. secret string,
  58. cassandra_server string, cassandra_keyspace string,
  59. redis_server string, redis_password string, redis_database int,
  60. ) (fs *FilerServer, err error) {
  61. fs = &FilerServer{
  62. master: master,
  63. collection: collection,
  64. defaultReplication: replication,
  65. redirectOnRead: redirectOnRead,
  66. disableDirListing: disableDirListing,
  67. maxMB: maxMB,
  68. port: ip + ":" + strconv.Itoa(port),
  69. }
  70. var setting *filerConf
  71. if confFile != "" {
  72. setting, err = parseConfFile(confFile)
  73. if err != nil {
  74. return nil, err
  75. }
  76. } else {
  77. setting = new(filerConf)
  78. }
  79. if setting.MysqlConf != nil && len(setting.MysqlConf) != 0 {
  80. mysql_store := mysql_store.NewMysqlStore(setting.MysqlConf, setting.IsSharding, setting.ShardCount)
  81. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, mysql_store)
  82. } else if setting.PostgresConf != nil {
  83. fs.filer = postgres_store.NewPostgresStore(master, *setting.PostgresConf)
  84. } else if cassandra_server != "" {
  85. cassandra_store, err := cassandra_store.NewCassandraStore(cassandra_keyspace, cassandra_server)
  86. if err != nil {
  87. glog.Fatalf("Can not connect to cassandra server %s with keyspace %s: %v", cassandra_server, cassandra_keyspace, err)
  88. }
  89. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, cassandra_store)
  90. } else if redis_server != "" {
  91. redis_store := redis_store.NewRedisStore(redis_server, redis_password, redis_database)
  92. fs.filer = flat_namespace.NewFlatNamespaceFiler(master, redis_store)
  93. } else {
  94. if fs.filer, err = embedded_filer.NewFilerEmbedded(master, dir); err != nil {
  95. glog.Fatalf("Can not start filer in dir %s : %v", dir, err)
  96. return
  97. }
  98. defaultMux.HandleFunc("/admin/mv", fs.moveHandler)
  99. }
  100. defaultMux.HandleFunc("/admin/register", fs.registerHandler)
  101. defaultMux.HandleFunc("/", fs.filerHandler)
  102. if defaultMux != readonlyMux {
  103. readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
  104. }
  105. go func() {
  106. connected := true
  107. fs.masterNodes = storage.NewMasterNodes(fs.master)
  108. glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
  109. for {
  110. glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
  111. master, err := fs.detectHealthyMaster(fs.getMasterNode())
  112. if err == nil {
  113. if !connected {
  114. connected = true
  115. if fs.getMasterNode() != master {
  116. fs.setMasterNode(master)
  117. }
  118. glog.V(0).Infoln("Filer Server Connected with master at", master)
  119. }
  120. } else {
  121. glog.V(1).Infof("Filer Server Failed to talk with master %s: %v", fs.getMasterNode(), err)
  122. if connected {
  123. connected = false
  124. }
  125. }
  126. if connected {
  127. time.Sleep(time.Duration(float32(10*1e3)*(1+rand.Float32())) * time.Millisecond)
  128. } else {
  129. time.Sleep(time.Duration(float32(10*1e3)*0.25) * time.Millisecond)
  130. }
  131. }
  132. }()
  133. return fs, nil
  134. }
  135. func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
  136. return security.GenJwt(fs.secret, fileId)
  137. }
  138. func (fs *FilerServer) getMasterNode() string {
  139. fs.mnLock.RLock()
  140. defer fs.mnLock.RUnlock()
  141. return fs.master
  142. }
  143. func (fs *FilerServer) setMasterNode(masterNode string) {
  144. fs.mnLock.Lock()
  145. defer fs.mnLock.Unlock()
  146. fs.master = masterNode
  147. }
  148. func (fs *FilerServer) detectHealthyMaster(masterNode string) (master string, e error) {
  149. if e = checkMaster(masterNode); e != nil {
  150. fs.masterNodes.Reset()
  151. for i := 0; i <= 3; i++ {
  152. master, e = fs.masterNodes.FindMaster()
  153. if e != nil {
  154. continue
  155. } else {
  156. if e = checkMaster(master); e == nil {
  157. break
  158. }
  159. }
  160. }
  161. } else {
  162. master = masterNode
  163. }
  164. return
  165. }
  166. func checkMaster(masterNode string) error {
  167. statUrl := "http://" + masterNode + "/stats/health"
  168. glog.V(4).Infof("Connecting to %s ...", statUrl)
  169. _, e := util.Get(statUrl)
  170. return e
  171. }