volume_server.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package weed_server
  2. import (
  3. "net/http"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. "google.golang.org/grpc"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. "github.com/seaweedfs/seaweedfs/weed/storage"
  15. )
  16. type VolumeServer struct {
  17. volume_server_pb.UnimplementedVolumeServerServer
  18. inFlightUploadDataSize int64
  19. inFlightDownloadDataSize int64
  20. concurrentUploadLimit int64
  21. concurrentDownloadLimit int64
  22. inFlightUploadDataLimitCond *sync.Cond
  23. inFlightDownloadDataLimitCond *sync.Cond
  24. inflightUploadDataTimeout time.Duration
  25. hasSlowRead bool
  26. readBufferSizeMB int
  27. SeedMasterNodes []pb.ServerAddress
  28. currentMaster pb.ServerAddress
  29. pulseSeconds int
  30. dataCenter string
  31. rack string
  32. store *storage.Store
  33. guard *security.Guard
  34. grpcDialOption grpc.DialOption
  35. needleMapKind storage.NeedleMapKind
  36. ldbTimout int64
  37. FixJpgOrientation bool
  38. ReadMode string
  39. compactionBytePerSecond int64
  40. metricsAddress string
  41. metricsIntervalSec int
  42. fileSizeLimitBytes int64
  43. isHeartbeating bool
  44. stopChan chan bool
  45. }
  46. func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
  47. port int, grpcPort int, publicUrl string,
  48. folders []string, maxCounts []int32, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
  49. idxFolder string,
  50. needleMapKind storage.NeedleMapKind,
  51. masterNodes []pb.ServerAddress, pulseSeconds int,
  52. dataCenter string, rack string,
  53. whiteList []string,
  54. fixJpgOrientation bool,
  55. readMode string,
  56. compactionMBPerSecond int,
  57. fileSizeLimitMB int,
  58. concurrentUploadLimit int64,
  59. concurrentDownloadLimit int64,
  60. inflightUploadDataTimeout time.Duration,
  61. hasSlowRead bool,
  62. readBufferSizeMB int,
  63. ldbTimeout int64,
  64. ) *VolumeServer {
  65. v := util.GetViper()
  66. signingKey := v.GetString("jwt.signing.key")
  67. v.SetDefault("jwt.signing.expires_after_seconds", 10)
  68. expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
  69. enableUiAccess := v.GetBool("access.ui")
  70. readSigningKey := v.GetString("jwt.signing.read.key")
  71. v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
  72. readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
  73. vs := &VolumeServer{
  74. pulseSeconds: pulseSeconds,
  75. dataCenter: dataCenter,
  76. rack: rack,
  77. needleMapKind: needleMapKind,
  78. FixJpgOrientation: fixJpgOrientation,
  79. ReadMode: readMode,
  80. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
  81. compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
  82. fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
  83. isHeartbeating: true,
  84. stopChan: make(chan bool),
  85. inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  86. inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
  87. concurrentUploadLimit: concurrentUploadLimit,
  88. concurrentDownloadLimit: concurrentDownloadLimit,
  89. inflightUploadDataTimeout: inflightUploadDataTimeout,
  90. hasSlowRead: hasSlowRead,
  91. readBufferSizeMB: readBufferSizeMB,
  92. ldbTimout: ldbTimeout,
  93. }
  94. vs.SeedMasterNodes = masterNodes
  95. vs.checkWithMaster()
  96. vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes, ldbTimeout)
  97. vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
  98. handleStaticResources(adminMux)
  99. adminMux.HandleFunc("/status", vs.statusHandler)
  100. adminMux.HandleFunc("/healthz", vs.healthzHandler)
  101. if signingKey == "" || enableUiAccess {
  102. // only expose the volume server details for safe environments
  103. adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
  104. /*
  105. adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
  106. adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
  107. adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
  108. */
  109. }
  110. adminMux.HandleFunc("/", vs.privateStoreHandler)
  111. if publicMux != adminMux {
  112. // separated admin and public port
  113. handleStaticResources(publicMux)
  114. publicMux.HandleFunc("/", vs.publicReadOnlyHandler)
  115. }
  116. go vs.heartbeat()
  117. go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
  118. return vs
  119. }
  120. func (vs *VolumeServer) SetStopping() {
  121. glog.V(0).Infoln("Stopping volume server...")
  122. vs.store.SetStopping()
  123. }
  124. func (vs *VolumeServer) LoadNewVolumes() {
  125. glog.V(0).Infoln(" Loading new volume ids ...")
  126. vs.store.LoadNewVolumes()
  127. }
  128. func (vs *VolumeServer) Shutdown() {
  129. glog.V(0).Infoln("Shutting down volume server...")
  130. vs.store.Close()
  131. glog.V(0).Infoln("Shut down successfully!")
  132. }