volume_grpc_vacuum.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package weed_server
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/stats"
  5. "strconv"
  6. "time"
  7. "github.com/prometheus/procfs"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "runtime"
  12. )
  13. var numCPU = runtime.NumCPU()
  14. func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
  15. resp := &volume_server_pb.VacuumVolumeCheckResponse{}
  16. garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId))
  17. resp.GarbageRatio = garbageRatio
  18. if err != nil {
  19. glog.V(3).Infof("check volume %d: %v", req.VolumeId, err)
  20. }
  21. return resp, err
  22. }
  23. func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
  24. start := time.Now()
  25. defer func(start time.Time) {
  26. stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
  27. }(start)
  28. resp := &volume_server_pb.VacuumVolumeCompactResponse{}
  29. reportInterval := int64(1024 * 1024 * 128)
  30. nextReportTarget := reportInterval
  31. fs, fsErr := procfs.NewDefaultFS()
  32. var sendErr error
  33. err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
  34. if processed > nextReportTarget {
  35. resp.ProcessedBytes = processed
  36. if fsErr == nil && numCPU > 0 {
  37. if fsLa, err := fs.LoadAvg(); err == nil {
  38. resp.LoadAvg_1M = float32(fsLa.Load1 / float64(numCPU))
  39. }
  40. }
  41. if sendErr = stream.Send(resp); sendErr != nil {
  42. return false
  43. }
  44. nextReportTarget = processed + reportInterval
  45. }
  46. return true
  47. })
  48. stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc()
  49. if err != nil {
  50. glog.Errorf("failed compact volume %d: %v", req.VolumeId, err)
  51. return err
  52. }
  53. if sendErr != nil {
  54. glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr)
  55. return sendErr
  56. }
  57. glog.V(1).Infof("compact volume %d", req.VolumeId)
  58. return nil
  59. }
  60. func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
  61. start := time.Now()
  62. defer func(start time.Time) {
  63. stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
  64. }(start)
  65. resp := &volume_server_pb.VacuumVolumeCommitResponse{}
  66. readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
  67. if err != nil {
  68. glog.Errorf("failed commit volume %d: %v", req.VolumeId, err)
  69. } else {
  70. glog.V(1).Infof("commit volume %d", req.VolumeId)
  71. }
  72. stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
  73. resp.IsReadOnly = readOnly
  74. return resp, err
  75. }
  76. func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) {
  77. resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
  78. err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
  79. if err != nil {
  80. glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err)
  81. } else {
  82. glog.V(1).Infof("cleanup volume %d", req.VolumeId)
  83. }
  84. return resp, err
  85. }