topology_vacuum.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package topology
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "io"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. )
  16. func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid needle.VolumeId,
  17. locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
  18. ch := make(chan int, locationlist.Length())
  19. errCount := int32(0)
  20. for index, dn := range locationlist.list {
  21. go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
  22. err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  23. resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
  24. VolumeId: uint32(vid),
  25. })
  26. if err != nil {
  27. atomic.AddInt32(&errCount, 1)
  28. ch <- -1
  29. return err
  30. }
  31. if resp.GarbageRatio >= garbageThreshold {
  32. ch <- index
  33. } else {
  34. ch <- -1
  35. }
  36. return nil
  37. })
  38. if err != nil {
  39. glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
  40. }
  41. }(index, dn.ServerAddress(), vid)
  42. }
  43. vacuumLocationList := NewVolumeLocationList()
  44. waitTimeout := time.NewTimer(time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1))
  45. defer waitTimeout.Stop()
  46. for range locationlist.list {
  47. select {
  48. case index := <-ch:
  49. if index != -1 {
  50. vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
  51. }
  52. case <-waitTimeout.C:
  53. return vacuumLocationList, false
  54. }
  55. }
  56. return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
  57. }
  58. func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
  59. locationlist *VolumeLocationList, preallocate int64) bool {
  60. vl.accessLock.Lock()
  61. vl.removeFromWritable(vid)
  62. vl.accessLock.Unlock()
  63. ch := make(chan bool, locationlist.Length())
  64. for index, dn := range locationlist.list {
  65. go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
  66. glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
  67. err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  68. stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
  69. VolumeId: uint32(vid),
  70. Preallocate: preallocate,
  71. })
  72. if err != nil {
  73. return err
  74. }
  75. for {
  76. resp, recvErr := stream.Recv()
  77. if recvErr != nil {
  78. if recvErr == io.EOF {
  79. break
  80. } else {
  81. return recvErr
  82. }
  83. }
  84. glog.V(0).Infof("%d vacuum %d on %s processed %d bytes, loadAvg %.02f%%",
  85. index, vid, url, resp.ProcessedBytes, resp.LoadAvg_1M*100)
  86. }
  87. return nil
  88. })
  89. if err != nil {
  90. glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)
  91. ch <- false
  92. } else {
  93. glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
  94. ch <- true
  95. }
  96. }(index, dn.ServerAddress(), vid)
  97. }
  98. isVacuumSuccess := true
  99. waitTimeout := time.NewTimer(3 * time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1))
  100. defer waitTimeout.Stop()
  101. for range locationlist.list {
  102. select {
  103. case canCommit := <-ch:
  104. isVacuumSuccess = isVacuumSuccess && canCommit
  105. case <-waitTimeout.C:
  106. return false
  107. }
  108. }
  109. return isVacuumSuccess
  110. }
  111. func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool {
  112. isCommitSuccess := true
  113. isReadOnly := false
  114. isFullCapacity := false
  115. for _, dn := range vacuumLocationList.list {
  116. glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
  117. err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  118. resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
  119. VolumeId: uint32(vid),
  120. })
  121. if resp != nil {
  122. if resp.IsReadOnly {
  123. isReadOnly = true
  124. }
  125. if resp.VolumeSize > t.volumeSizeLimit {
  126. isFullCapacity = true
  127. }
  128. }
  129. return err
  130. })
  131. if err != nil {
  132. glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
  133. isCommitSuccess = false
  134. } else {
  135. glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
  136. }
  137. }
  138. //we should check the status of all replicas
  139. if len(locationList.list) > len(vacuumLocationList.list) {
  140. for _, dn := range locationList.list {
  141. isFound := false
  142. for _, dnVaccum := range vacuumLocationList.list {
  143. if dn.id == dnVaccum.id {
  144. isFound = true
  145. break
  146. }
  147. }
  148. if !isFound {
  149. err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  150. resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
  151. VolumeId: uint32(vid),
  152. })
  153. if resp != nil {
  154. if resp.IsReadOnly {
  155. isReadOnly = true
  156. }
  157. if resp.VolumeSize > t.volumeSizeLimit {
  158. isFullCapacity = true
  159. }
  160. }
  161. return err
  162. })
  163. if err != nil {
  164. glog.Errorf("Error when checking volume %d status on %s: %v", vid, dn.Url(), err)
  165. //we mark volume read-only, since the volume state is unknown
  166. isReadOnly = true
  167. }
  168. }
  169. }
  170. }
  171. if isCommitSuccess {
  172. //record vacuum time of volume
  173. vl.accessLock.Lock()
  174. vl.vacuumedVolumes[vid] = time.Now()
  175. vl.accessLock.Unlock()
  176. for _, dn := range vacuumLocationList.list {
  177. vl.SetVolumeAvailable(dn, vid, isReadOnly, isFullCapacity)
  178. }
  179. }
  180. return isCommitSuccess
  181. }
  182. func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
  183. for _, dn := range locationlist.list {
  184. glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
  185. err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  186. _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
  187. VolumeId: uint32(vid),
  188. })
  189. return err
  190. })
  191. if err != nil {
  192. glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, dn.Url(), err)
  193. } else {
  194. glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, dn.Url())
  195. }
  196. }
  197. }
  198. func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64) {
  199. // if there is vacuum going on, return immediately
  200. swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
  201. if !swapped {
  202. return
  203. }
  204. defer atomic.StoreInt64(&t.vacuumLockCounter, 0)
  205. // now only one vacuum process going on
  206. glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
  207. garbageThreshold, collection, volumeId)
  208. for _, col := range t.collectionMap.Items() {
  209. c := col.(*Collection)
  210. if collection != "" && collection != c.Name {
  211. continue
  212. }
  213. for _, vl := range c.storageType2VolumeLayout.Items() {
  214. if vl != nil {
  215. volumeLayout := vl.(*VolumeLayout)
  216. if volumeId > 0 {
  217. vid := needle.VolumeId(volumeId)
  218. volumeLayout.accessLock.RLock()
  219. locationList, ok := volumeLayout.vid2location[vid]
  220. volumeLayout.accessLock.RUnlock()
  221. if ok {
  222. t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
  223. }
  224. } else {
  225. t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate)
  226. }
  227. }
  228. }
  229. }
  230. }
  231. func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64) {
  232. volumeLayout.accessLock.RLock()
  233. todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
  234. for vid, locationList := range volumeLayout.vid2location {
  235. todoVolumeMap[vid] = locationList.Copy()
  236. }
  237. volumeLayout.accessLock.RUnlock()
  238. // limiter for each volume server
  239. limiter := make(map[NodeId]int)
  240. var limiterLock sync.Mutex
  241. for _, locationList := range todoVolumeMap {
  242. for _, dn := range locationList.list {
  243. if _, ok := limiter[dn.Id()]; !ok {
  244. limiter[dn.Id()] = maxParallelVacuumPerServer
  245. }
  246. }
  247. }
  248. executor := util.NewLimitedConcurrentExecutor(100)
  249. var wg sync.WaitGroup
  250. for len(todoVolumeMap) > 0 {
  251. pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
  252. for vid, locationList := range todoVolumeMap {
  253. hasEnoughQuota := true
  254. for _, dn := range locationList.list {
  255. limiterLock.Lock()
  256. quota := limiter[dn.Id()]
  257. limiterLock.Unlock()
  258. if quota <= 0 {
  259. hasEnoughQuota = false
  260. break
  261. }
  262. }
  263. if !hasEnoughQuota {
  264. pendingVolumeMap[vid] = locationList
  265. continue
  266. }
  267. // debit the quota
  268. for _, dn := range locationList.list {
  269. limiterLock.Lock()
  270. limiter[dn.Id()]--
  271. limiterLock.Unlock()
  272. }
  273. wg.Add(1)
  274. executor.Execute(func() {
  275. defer wg.Done()
  276. t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
  277. // credit the quota
  278. for _, dn := range locationList.list {
  279. limiterLock.Lock()
  280. limiter[dn.Id()]++
  281. limiterLock.Unlock()
  282. }
  283. })
  284. }
  285. if len(todoVolumeMap) == len(pendingVolumeMap) {
  286. time.Sleep(10 * time.Second)
  287. }
  288. todoVolumeMap = pendingVolumeMap
  289. }
  290. wg.Wait()
  291. }
  292. func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {
  293. volumeLayout.accessLock.RLock()
  294. isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid)
  295. isEnoughCopies := volumeLayout.enoughCopies(vid)
  296. volumeLayout.accessLock.RUnlock()
  297. if isReadOnly {
  298. return
  299. }
  300. if !isEnoughCopies {
  301. glog.Warningf("skip vacuuming: not enough copies for volume:%d", vid)
  302. return
  303. }
  304. glog.V(1).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
  305. if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(
  306. grpcDialOption, vid, locationList, garbageThreshold); needVacuum {
  307. if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
  308. t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList, locationList)
  309. } else {
  310. t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
  311. }
  312. }
  313. }