volume_server.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package weed_server
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/storage/types"
  5. "net/http"
  6. "sync"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/storage"
  13. )
  14. type VolumeServer struct {
  15. inFlightDataSize int64
  16. concurrentUploadLimit int64
  17. inFlightDataLimitCond *sync.Cond
  18. SeedMasterNodes []string
  19. currentMaster string
  20. pulseSeconds int
  21. dataCenter string
  22. rack string
  23. store *storage.Store
  24. guard *security.Guard
  25. grpcDialOption grpc.DialOption
  26. needleMapKind storage.NeedleMapKind
  27. FixJpgOrientation bool
  28. ReadMode string
  29. compactionBytePerSecond int64
  30. metricsAddress string
  31. metricsIntervalSec int
  32. fileSizeLimitBytes int64
  33. isHeartbeating bool
  34. stopChan chan bool
  35. }
  36. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  37. port int, publicUrl string,
  38. folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  39. idxFolder string,
  40. needleMapKind storage.NeedleMapKind,
  41. masterNodes []string, pulseSeconds int,
  42. dataCenter string, rack string,
  43. whiteList []string,
  44. fixJpgOrientation bool,
  45. readMode string,
  46. compactionMBPerSecond int,
  47. fileSizeLimitMB int,
  48. concurrentUploadLimit int64,
  49. ) *VolumeServer {
  50. v := util.GetViper()
  51. signingKey := v.GetString("jwt.signing.key")
  52. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  53. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  54. enableUiAccess := v.GetBool("access.ui")
  55. readSigningKey := v.GetString("jwt.signing.read.key")
  56. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  57. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  58. vs := &VolumeServer{
  59. pulseSeconds: pulseSeconds,
  60. dataCenter: dataCenter,
  61. rack: rack,
  62. needleMapKind: needleMapKind,
  63. FixJpgOrientation: fixJpgOrientation,
  64. ReadMode: readMode,
  65. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  66. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  67. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  68. isHeartbeating: true,
  69. stopChan: make(chan bool),
  70. inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
  71. concurrentUploadLimit: concurrentUploadLimit,
  72. }
  73. vs.SeedMasterNodes = masterNodes
  74. vs.checkWithMaster()
  75. vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
  76. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  77. handleStaticResources(adminMux)
  78. adminMux.HandleFunc("/status", vs.statusHandler)
  79. if signingKey == "" || enableUiAccess {
  80. // only expose the volume server details for safe environments
  81. adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
  82. /*
  83. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  84. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  85. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  86. */
  87. }
  88. adminMux.HandleFunc("/", vs.privateStoreHandler)
  89. if publicMux != adminMux {
  90. // separated admin and public port
  91. handleStaticResources(publicMux)
  92. publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
  93. }
  94. go vs.heartbeat()
  95. go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  96. return vs
  97. }
  98. func (vs *VolumeServer) SetStopping() {
  99. glog.V(0).Infoln("Stopping volume server...")
  100. vs.store.SetStopping()
  101. }
  102. func (vs *VolumeServer) Shutdown() {
  103. glog.V(0).Infoln("Shutting down volume server...")
  104. vs.store.Close()
  105. glog.V(0).Infoln("Shut down successfully!")
  106. }