123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package topology
- import (
- "encoding/json"
- "errors"
- "net/url"
- "time"
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
- "github.com/chrislusf/weed-fs/go/util"
- )
- func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- //glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
- if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
- //glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- //glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
- ch <- ret
- }
- }(index, dn.Url(), vid)
- }
- isCheckSuccess := true
- for _ = range locationlist.list {
- select {
- case canVacuum := <-ch:
- isCheckSuccess = isCheckSuccess && canVacuum
- case <-time.After(30 * time.Minute):
- isCheckSuccess = false
- break
- }
- }
- return isCheckSuccess
- }
- func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- vl.removeFromWritable(vid)
- ch := make(chan bool, locationlist.Length())
- for index, dn := range locationlist.list {
- go func(index int, url string, vid storage.VolumeId) {
- glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
- if e := vacuumVolume_Compact(url, vid); e != nil {
- glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
- ch <- false
- } else {
- glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
- ch <- true
- }
- }(index, dn.Url(), vid)
- }
- isVacuumSuccess := true
- for _ = range locationlist.list {
- select {
- case _ = <-ch:
- case <-time.After(30 * time.Minute):
- isVacuumSuccess = false
- break
- }
- }
- return isVacuumSuccess
- }
- func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
- isCommitSuccess := true
- for _, dn := range locationlist.list {
- glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
- if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
- glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
- isCommitSuccess = false
- } else {
- glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
- }
- if isCommitSuccess {
- vl.SetVolumeAvailable(dn, vid)
- }
- }
- return isCommitSuccess
- }
- func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, col := range t.collectionMap.Items {
- c := col.(*Collection)
- for _, vl := range c.storageType2VolumeLayout.Items {
- if vl != nil {
- volumeLayout := vl.(*VolumeLayout)
- for vid, locationlist := range volumeLayout.vid2location {
- if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
- batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
- }
- }
- }
- }
- }
- }
- return 0
- }
- type VacuumVolumeResult struct {
- Result bool
- Error string
- }
- func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("garbageThreshold", garbageThreshold)
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_check", values)
- if err != nil {
- glog.V(0).Infoln("parameters:", values)
- return err, false
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err, false
- }
- if ret.Error != "" {
- return errors.New(ret.Error), false
- }
- return nil, ret.Result
- }
- func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_compact", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
- }
- func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
- values := make(url.Values)
- values.Add("volume", vid.String())
- jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum_volume_commit", values)
- if err != nil {
- return err
- }
- var ret VacuumVolumeResult
- if err := json.Unmarshal(jsonBlob, &ret); err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
- }
|