topology_vacuum.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package topology
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "google.golang.org/grpc"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. )
  16. func (t *Topology) batchVacuumVolumeCheck(grpcDialOption grpc.DialOption, vid needle.VolumeId,
  17. locationlist *VolumeLocationList, garbageThreshold float64) (*VolumeLocationList, bool) {
  18. ch := make(chan int, locationlist.Length())
  19. errCount := int32(0)
  20. for index, dn := range locationlist.list {
  21. go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
  22. err := operation.WithVolumeServerClient(false, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  23. resp, err := volumeServerClient.VacuumVolumeCheck(context.Background(), &volume_server_pb.VacuumVolumeCheckRequest{
  24. VolumeId: uint32(vid),
  25. })
  26. if err != nil {
  27. atomic.AddInt32(&errCount, 1)
  28. ch <- -1
  29. return err
  30. }
  31. if resp.GarbageRatio >= garbageThreshold {
  32. ch <- index
  33. } else {
  34. ch <- -1
  35. }
  36. return nil
  37. })
  38. if err != nil {
  39. glog.V(0).Infof("Checking vacuuming %d on %s: %v", vid, url, err)
  40. }
  41. }(index, dn.ServerAddress(), vid)
  42. }
  43. vacuumLocationList := NewVolumeLocationList()
  44. waitTimeout := time.NewTimer(time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1))
  45. defer waitTimeout.Stop()
  46. for range locationlist.list {
  47. select {
  48. case index := <-ch:
  49. if index != -1 {
  50. vacuumLocationList.list = append(vacuumLocationList.list, locationlist.list[index])
  51. }
  52. case <-waitTimeout.C:
  53. return vacuumLocationList, false
  54. }
  55. }
  56. return vacuumLocationList, errCount == 0 && len(vacuumLocationList.list) > 0
  57. }
  58. func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId,
  59. locationlist *VolumeLocationList, preallocate int64) bool {
  60. vl.accessLock.Lock()
  61. vl.removeFromWritable(vid)
  62. vl.accessLock.Unlock()
  63. ch := make(chan bool, locationlist.Length())
  64. for index, dn := range locationlist.list {
  65. go func(index int, url pb.ServerAddress, vid needle.VolumeId) {
  66. glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
  67. err := operation.WithVolumeServerClient(true, url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  68. stream, err := volumeServerClient.VacuumVolumeCompact(context.Background(), &volume_server_pb.VacuumVolumeCompactRequest{
  69. VolumeId: uint32(vid),
  70. Preallocate: preallocate,
  71. })
  72. if err != nil {
  73. return err
  74. }
  75. for {
  76. resp, recvErr := stream.Recv()
  77. if recvErr != nil {
  78. if recvErr == io.EOF {
  79. break
  80. } else {
  81. return recvErr
  82. }
  83. }
  84. glog.V(0).Infof("%d vacuum %d on %s processed %d bytes, loadAvg %.02f%%",
  85. index, vid, url, resp.ProcessedBytes, resp.LoadAvg_1M*100)
  86. }
  87. return nil
  88. })
  89. if err != nil {
  90. glog.Errorf("Error when vacuuming %d on %s: %v", vid, url, err)
  91. ch <- false
  92. } else {
  93. glog.V(0).Infof("Complete vacuuming %d on %s", vid, url)
  94. ch <- true
  95. }
  96. }(index, dn.ServerAddress(), vid)
  97. }
  98. isVacuumSuccess := true
  99. waitTimeout := time.NewTimer(3 * time.Minute * time.Duration(t.volumeSizeLimit/1024/1024/1000+1))
  100. defer waitTimeout.Stop()
  101. for range locationlist.list {
  102. select {
  103. case canCommit := <-ch:
  104. isVacuumSuccess = isVacuumSuccess && canCommit
  105. case <-waitTimeout.C:
  106. return false
  107. }
  108. }
  109. return isVacuumSuccess
  110. }
  111. func (t *Topology) batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, vacuumLocationList, locationList *VolumeLocationList) bool {
  112. isCommitSuccess := true
  113. isReadOnly := false
  114. isFullCapacity := false
  115. for _, dn := range vacuumLocationList.list {
  116. glog.V(0).Infoln("Start Committing vacuum", vid, "on", dn.Url())
  117. err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  118. resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
  119. VolumeId: uint32(vid),
  120. })
  121. if resp != nil {
  122. if resp.IsReadOnly {
  123. isReadOnly = true
  124. }
  125. if resp.VolumeSize > t.volumeSizeLimit {
  126. isFullCapacity = true
  127. }
  128. }
  129. return err
  130. })
  131. if err != nil {
  132. glog.Errorf("Error when committing vacuum %d on %s: %v", vid, dn.Url(), err)
  133. isCommitSuccess = false
  134. } else {
  135. glog.V(0).Infof("Complete Committing vacuum %d on %s", vid, dn.Url())
  136. }
  137. }
  138. //we should check the status of all replicas
  139. if len(locationList.list) > len(vacuumLocationList.list) {
  140. for _, dn := range locationList.list {
  141. isFound := false
  142. for _, dnVaccum := range vacuumLocationList.list {
  143. if dn.id == dnVaccum.id {
  144. isFound = true
  145. break
  146. }
  147. }
  148. if !isFound {
  149. err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  150. resp, err := volumeServerClient.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{
  151. VolumeId: uint32(vid),
  152. })
  153. if resp != nil {
  154. if resp.IsReadOnly {
  155. isReadOnly = true
  156. }
  157. if resp.VolumeSize > t.volumeSizeLimit {
  158. isFullCapacity = true
  159. }
  160. }
  161. return err
  162. })
  163. if err != nil {
  164. glog.Errorf("Error when checking volume %d status on %s: %v", vid, dn.Url(), err)
  165. //we mark volume read-only, since the volume state is unknown
  166. isReadOnly = true
  167. }
  168. }
  169. }
  170. }
  171. if isCommitSuccess {
  172. //record vacuum time of volume
  173. vl.accessLock.Lock()
  174. vl.vacuumedVolumes[vid] = time.Now()
  175. vl.accessLock.Unlock()
  176. for _, dn := range vacuumLocationList.list {
  177. vl.SetVolumeAvailable(dn, vid, isReadOnly, isFullCapacity)
  178. }
  179. }
  180. return isCommitSuccess
  181. }
  182. func (t *Topology) batchVacuumVolumeCleanup(grpcDialOption grpc.DialOption, vl *VolumeLayout, vid needle.VolumeId, locationlist *VolumeLocationList) {
  183. for _, dn := range locationlist.list {
  184. glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
  185. err := operation.WithVolumeServerClient(false, dn.ServerAddress(), grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  186. _, err := volumeServerClient.VacuumVolumeCleanup(context.Background(), &volume_server_pb.VacuumVolumeCleanupRequest{
  187. VolumeId: uint32(vid),
  188. })
  189. return err
  190. })
  191. if err != nil {
  192. glog.Errorf("Error when cleaning up vacuum %d on %s: %v", vid, dn.Url(), err)
  193. } else {
  194. glog.V(0).Infof("Complete cleaning up vacuum %d on %s", vid, dn.Url())
  195. }
  196. }
  197. }
  198. func (t *Topology) Vacuum(grpcDialOption grpc.DialOption, garbageThreshold float64, maxParallelVacuumPerServer int, volumeId uint32, collection string, preallocate int64, automatic bool) {
  199. // if there is vacuum going on, return immediately
  200. swapped := atomic.CompareAndSwapInt64(&t.vacuumLockCounter, 0, 1)
  201. if !swapped {
  202. glog.V(0).Infof("Vacuum is already running")
  203. return
  204. }
  205. defer atomic.StoreInt64(&t.vacuumLockCounter, 0)
  206. // now only one vacuum process going on
  207. glog.V(1).Infof("Start vacuum on demand with threshold: %f collection: %s volumeId: %d",
  208. garbageThreshold, collection, volumeId)
  209. for _, col := range t.collectionMap.Items() {
  210. c := col.(*Collection)
  211. if collection != "" && collection != c.Name {
  212. continue
  213. }
  214. for _, vl := range c.storageType2VolumeLayout.Items() {
  215. if vl != nil {
  216. volumeLayout := vl.(*VolumeLayout)
  217. if volumeId > 0 {
  218. vid := needle.VolumeId(volumeId)
  219. volumeLayout.accessLock.RLock()
  220. locationList, ok := volumeLayout.vid2location[vid]
  221. volumeLayout.accessLock.RUnlock()
  222. if ok {
  223. t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
  224. }
  225. } else {
  226. t.vacuumOneVolumeLayout(grpcDialOption, volumeLayout, c, garbageThreshold, maxParallelVacuumPerServer, preallocate, automatic)
  227. }
  228. }
  229. if automatic && t.isDisableVacuum {
  230. break
  231. }
  232. }
  233. if automatic && t.isDisableVacuum {
  234. glog.V(0).Infof("Vacuum is disabled")
  235. break
  236. }
  237. }
  238. }
  239. func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, maxParallelVacuumPerServer int, preallocate int64, automatic bool) {
  240. volumeLayout.accessLock.RLock()
  241. todoVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
  242. for vid, locationList := range volumeLayout.vid2location {
  243. todoVolumeMap[vid] = locationList.Copy()
  244. }
  245. volumeLayout.accessLock.RUnlock()
  246. // limiter for each volume server
  247. limiter := make(map[NodeId]int)
  248. var limiterLock sync.Mutex
  249. for _, locationList := range todoVolumeMap {
  250. for _, dn := range locationList.list {
  251. if _, ok := limiter[dn.Id()]; !ok {
  252. limiter[dn.Id()] = maxParallelVacuumPerServer
  253. }
  254. }
  255. }
  256. executor := util.NewLimitedConcurrentExecutor(100)
  257. var wg sync.WaitGroup
  258. for len(todoVolumeMap) > 0 {
  259. pendingVolumeMap := make(map[needle.VolumeId]*VolumeLocationList)
  260. for vid, locationList := range todoVolumeMap {
  261. hasEnoughQuota := true
  262. for _, dn := range locationList.list {
  263. limiterLock.Lock()
  264. quota := limiter[dn.Id()]
  265. limiterLock.Unlock()
  266. if quota <= 0 {
  267. hasEnoughQuota = false
  268. break
  269. }
  270. }
  271. if !hasEnoughQuota {
  272. pendingVolumeMap[vid] = locationList
  273. continue
  274. }
  275. // debit the quota
  276. for _, dn := range locationList.list {
  277. limiterLock.Lock()
  278. limiter[dn.Id()]--
  279. limiterLock.Unlock()
  280. }
  281. wg.Add(1)
  282. executor.Execute(func() {
  283. defer wg.Done()
  284. t.vacuumOneVolumeId(grpcDialOption, volumeLayout, c, garbageThreshold, locationList, vid, preallocate)
  285. // credit the quota
  286. for _, dn := range locationList.list {
  287. limiterLock.Lock()
  288. limiter[dn.Id()]++
  289. limiterLock.Unlock()
  290. }
  291. })
  292. if automatic && t.isDisableVacuum {
  293. break
  294. }
  295. }
  296. if automatic && t.isDisableVacuum {
  297. break
  298. }
  299. if len(todoVolumeMap) == len(pendingVolumeMap) {
  300. time.Sleep(10 * time.Second)
  301. }
  302. todoVolumeMap = pendingVolumeMap
  303. }
  304. wg.Wait()
  305. }
  306. func (t *Topology) vacuumOneVolumeId(grpcDialOption grpc.DialOption, volumeLayout *VolumeLayout, c *Collection, garbageThreshold float64, locationList *VolumeLocationList, vid needle.VolumeId, preallocate int64) {
  307. volumeLayout.accessLock.RLock()
  308. isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid)
  309. isEnoughCopies := volumeLayout.enoughCopies(vid)
  310. volumeLayout.accessLock.RUnlock()
  311. if isReadOnly {
  312. return
  313. }
  314. if !isEnoughCopies {
  315. glog.Warningf("skip vacuuming: not enough copies for volume:%d", vid)
  316. return
  317. }
  318. glog.V(1).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
  319. if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(
  320. grpcDialOption, vid, locationList, garbageThreshold); needVacuum {
  321. if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
  322. t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList, locationList)
  323. } else {
  324. t.batchVacuumVolumeCleanup(grpcDialOption, volumeLayout, vid, vacuumLocationList)
  325. }
  326. }
  327. }