topology_vacuum.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package topology
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "net/url"
  6. "time"
  7. "fmt"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
  13. ch := make(chan bool, locationlist.Length())
  14. for index, dn := range locationlist.list {
  15. go func(index int, url string, vid storage.VolumeId) {
  16. //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
  17. if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
  18. //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
  19. ch <- false
  20. } else {
  21. //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
  22. ch <- ret
  23. }
  24. }(index, dn.Url(), vid)
  25. }
  26. isCheckSuccess := true
  27. for _ = range locationlist.list {
  28. select {
  29. case canVacuum := <-ch:
  30. isCheckSuccess = isCheckSuccess && canVacuum
  31. case <-time.After(30 * time.Minute):
  32. isCheckSuccess = false
  33. break
  34. }
  35. }
  36. return isCheckSuccess
  37. }
  38. func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, preallocate int64) bool {
  39. vl.removeFromWritable(vid)
  40. ch := make(chan bool, locationlist.Length())
  41. for index, dn := range locationlist.list {
  42. go func(index int, url string, vid storage.VolumeId) {
  43. glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
  44. if e := vacuumVolume_Compact(url, vid, preallocate); e != nil {
  45. glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
  46. ch <- false
  47. } else {
  48. glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
  49. ch <- true
  50. }
  51. }(index, dn.Url(), vid)
  52. }
  53. isVacuumSuccess := true
  54. for _ = range locationlist.list {
  55. select {
  56. case canCommit := <-ch:
  57. isVacuumSuccess = isVacuumSuccess && canCommit
  58. case <-time.After(30 * time.Minute):
  59. isVacuumSuccess = false
  60. break
  61. }
  62. }
  63. return isVacuumSuccess
  64. }
  65. func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
  66. isCommitSuccess := true
  67. for _, dn := range locationlist.list {
  68. glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
  69. if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
  70. glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
  71. isCommitSuccess = false
  72. } else {
  73. glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
  74. }
  75. if isCommitSuccess {
  76. vl.SetVolumeAvailable(dn, vid)
  77. }
  78. }
  79. return isCommitSuccess
  80. }
  81. func batchVacuumVolumeCleanup(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) {
  82. for _, dn := range locationlist.list {
  83. glog.V(0).Infoln("Start cleaning up", vid, "on", dn.Url())
  84. if e := vacuumVolume_Cleanup(dn.Url(), vid); e != nil {
  85. glog.V(0).Infoln("Error when cleaning up", vid, "on", dn.Url(), e)
  86. } else {
  87. glog.V(0).Infoln("Complete cleaning up", vid, "on", dn.Url())
  88. }
  89. }
  90. }
  91. func (t *Topology) Vacuum(garbageThreshold string, preallocate int64) int {
  92. glog.V(0).Infof("Start vacuum on demand with threshold:%s", garbageThreshold)
  93. for _, col := range t.collectionMap.Items() {
  94. c := col.(*Collection)
  95. for _, vl := range c.storageType2VolumeLayout.Items() {
  96. if vl != nil {
  97. volumeLayout := vl.(*VolumeLayout)
  98. for vid, locationlist := range volumeLayout.vid2location {
  99. volumeLayout.accessLock.RLock()
  100. isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
  101. volumeLayout.accessLock.RUnlock()
  102. if hasValue && isReadOnly {
  103. continue
  104. }
  105. glog.V(0).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
  106. if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
  107. if batchVacuumVolumeCompact(volumeLayout, vid, locationlist, preallocate) {
  108. batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
  109. }
  110. }
  111. }
  112. }
  113. }
  114. }
  115. return 0
  116. }
  117. type VacuumVolumeResult struct {
  118. Result bool
  119. Error string
  120. }
  121. func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
  122. values := make(url.Values)
  123. values.Add("volume", vid.String())
  124. values.Add("garbageThreshold", garbageThreshold)
  125. jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
  126. if err != nil {
  127. glog.V(0).Infoln("parameters:", values)
  128. return err, false
  129. }
  130. var ret VacuumVolumeResult
  131. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  132. return err, false
  133. }
  134. if ret.Error != "" {
  135. return errors.New(ret.Error), false
  136. }
  137. return nil, ret.Result
  138. }
  139. func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId, preallocate int64) error {
  140. values := make(url.Values)
  141. values.Add("volume", vid.String())
  142. values.Add("preallocate", fmt.Sprintf("%d", preallocate))
  143. jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
  144. if err != nil {
  145. return err
  146. }
  147. var ret VacuumVolumeResult
  148. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  149. return err
  150. }
  151. if ret.Error != "" {
  152. return errors.New(ret.Error)
  153. }
  154. return nil
  155. }
  156. func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
  157. values := make(url.Values)
  158. values.Add("volume", vid.String())
  159. jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
  160. if err != nil {
  161. return err
  162. }
  163. var ret VacuumVolumeResult
  164. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  165. return err
  166. }
  167. if ret.Error != "" {
  168. return errors.New(ret.Error)
  169. }
  170. return nil
  171. }
  172. func vacuumVolume_Cleanup(urlLocation string, vid storage.VolumeId) error {
  173. values := make(url.Values)
  174. values.Add("volume", vid.String())
  175. jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/cleanup", values)
  176. if err != nil {
  177. return err
  178. }
  179. var ret VacuumVolumeResult
  180. if err := json.Unmarshal(jsonBlob, &ret); err != nil {
  181. return err
  182. }
  183. if ret.Error != "" {
  184. return errors.New(ret.Error)
  185. }
  186. return nil
  187. }