volume_server.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package weed_server
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb"
  4. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  5. "github.com/chrislusf/seaweedfs/weed/storage/types"
  6. "net/http"
  7. "sync"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/stats"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/storage"
  14. )
  15. type VolumeServer struct {
  16. volume_server_pb.UnimplementedVolumeServerServer
  17. inFlightUploadDataSize int64
  18. inFlightDownloadDataSize int64
  19. concurrentUploadLimit int64
  20. concurrentDownloadLimit int64
  21. inFlightUploadDataLimitCond *sync.Cond
  22. inFlightDownloadDataLimitCond *sync.Cond
  23. SeedMasterNodes []pb.ServerAddress
  24. currentMaster pb.ServerAddress
  25. pulseSeconds int
  26. dataCenter string
  27. rack string
  28. store *storage.Store
  29. guard *security.Guard
  30. grpcDialOption grpc.DialOption
  31. needleMapKind storage.NeedleMapKind
  32. FixJpgOrientation bool
  33. ReadMode string
  34. compactionBytePerSecond int64
  35. metricsAddress string
  36. metricsIntervalSec int
  37. fileSizeLimitBytes int64
  38. isHeartbeating bool
  39. stopChan chan bool
  40. }
  41. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  42. port int, grpcPort int, publicUrl string,
  43. folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  44. idxFolder string,
  45. needleMapKind storage.NeedleMapKind,
  46. masterNodes []pb.ServerAddress, pulseSeconds int,
  47. dataCenter string, rack string,
  48. whiteList []string,
  49. fixJpgOrientation bool,
  50. readMode string,
  51. compactionMBPerSecond int,
  52. fileSizeLimitMB int,
  53. concurrentUploadLimit int64,
  54. concurrentDownloadLimit int64,
  55. ) *VolumeServer {
  56. v := util.GetViper()
  57. signingKey := v.GetString("jwt.signing.key")
  58. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  59. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  60. enableUiAccess := v.GetBool("access.ui")
  61. readSigningKey := v.GetString("jwt.signing.read.key")
  62. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  63. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  64. vs := &VolumeServer{
  65. pulseSeconds: pulseSeconds,
  66. dataCenter: dataCenter,
  67. rack: rack,
  68. needleMapKind: needleMapKind,
  69. FixJpgOrientation: fixJpgOrientation,
  70. ReadMode: readMode,
  71. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  72. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  73. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  74. isHeartbeating: true,
  75. stopChan: make(chan bool),
  76. inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  77. inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  78. concurrentUploadLimit: concurrentUploadLimit,
  79. concurrentDownloadLimit: concurrentDownloadLimit,
  80. }
  81. vs.SeedMasterNodes = masterNodes
  82. vs.checkWithMaster()
  83. vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
  84. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  85. handleStaticResources(adminMux)
  86. adminMux.HandleFunc("/status", vs.statusHandler)
  87. if signingKey == "" || enableUiAccess {
  88. // only expose the volume server details for safe environments
  89. adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
  90. /*
  91. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  92. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  93. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  94. */
  95. }
  96. adminMux.HandleFunc("/", vs.privateStoreHandler)
  97. if publicMux != adminMux {
  98. // separated admin and public port
  99. handleStaticResources(publicMux)
  100. publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
  101. }
  102. go vs.heartbeat()
  103. go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  104. return vs
  105. }
  106. func (vs *VolumeServer) SetStopping() {
  107. glog.V(0).Infoln("Stopping volume server...")
  108. vs.store.SetStopping()
  109. }
  110. func (vs *VolumeServer) Shutdown() {
  111. glog.V(0).Infoln("Shutting down volume server...")
  112. vs.store.Close()
  113. glog.V(0).Infoln("Shut down successfully!")
  114. }