Просмотр исходного кода

Correct the oversized state of volume after compaction

LIBA-S 4 лет назад
Родитель
Сommit
0157798ebf
3 измененных файлов с 223 добавлено и 15 удалено
  1. 2 2
      weed/topology/topology_vacuum.go
  2. 105 13
      weed/topology/volume_layout.go
  3. 116 0
      weed/topology/volume_layout_test.go

+ 2 - 2
weed/topology/topology_vacuum.go

@@ -172,10 +172,10 @@ func vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeLayout *VolumeL
 	for vid, locationList := range tmpMap {
 
 		volumeLayout.accessLock.RLock()
-		isReadOnly, hasValue := volumeLayout.readonlyVolumes[vid]
+		isReadOnly := volumeLayout.readonlyVolumes.IsTrue(vid)
 		volumeLayout.accessLock.RUnlock()
 
-		if hasValue && isReadOnly {
+		if isReadOnly {
 			continue
 		}
 

+ 105 - 13
weed/topology/volume_layout.go

@@ -13,14 +13,100 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/storage/super_block"
 )
 
+type copyState int
+
+const (
+	noCopies copyState = 0 + iota
+	insufficientCopies
+	enoughCopies
+)
+
+type volumeState string
+
+const (
+	readOnlyState  volumeState = "ReadOnly"
+	oversizedState             = "Oversized"
+)
+
+type stateIndicator func(copyState) bool
+
+func ExistCopies() stateIndicator {
+	return func(state copyState) bool { return state != noCopies }
+}
+
+func NoCopies() stateIndicator {
+	return func(state copyState) bool { return state == noCopies }
+}
+
+type volumesBinaryState struct {
+	rp        *super_block.ReplicaPlacement
+	name      volumeState    // the name for volume state (eg. "Readonly", "Oversized")
+	indicator stateIndicator // indicate whether the volumes should be marked as `name`
+	copyMap   map[needle.VolumeId]*VolumeLocationList
+}
+
+func NewVolumesBinaryState(name volumeState, rp *super_block.ReplicaPlacement, indicator stateIndicator) *volumesBinaryState {
+	return &volumesBinaryState{
+		rp:        rp,
+		name:      name,
+		indicator: indicator,
+		copyMap:   make(map[needle.VolumeId]*VolumeLocationList),
+	}
+}
+
+func (v *volumesBinaryState) Dump() (res []uint32) {
+	for vid, list := range v.copyMap {
+		if v.indicator(v.copyState(list)) {
+			res = append(res, uint32(vid))
+		}
+	}
+	return
+}
+
+func (v *volumesBinaryState) IsTrue(vid needle.VolumeId) bool {
+	list, _ := v.copyMap[vid]
+	return v.indicator(v.copyState(list))
+}
+
+func (v *volumesBinaryState) Add(vid needle.VolumeId, dn *DataNode) {
+	list, _ := v.copyMap[vid]
+	if list != nil {
+		list.Set(dn)
+		return
+	}
+	list = NewVolumeLocationList()
+	list.Set(dn)
+	v.copyMap[vid] = list
+}
+
+func (v *volumesBinaryState) Remove(vid needle.VolumeId, dn *DataNode) {
+	list, _ := v.copyMap[vid]
+	if list != nil {
+		list.Remove(dn)
+		if list.Length() == 0 {
+			delete(v.copyMap, vid)
+		}
+	}
+}
+
+func (v *volumesBinaryState) copyState(list *VolumeLocationList) copyState {
+	if list == nil {
+		return noCopies
+	}
+	if list.Length() < v.rp.GetCopyCount() {
+		return insufficientCopies
+	}
+	return enoughCopies
+}
+
 // mapping from volume to its locations, inverted from server to volume
 type VolumeLayout struct {
 	rp               *super_block.ReplicaPlacement
 	ttl              *needle.TTL
 	vid2location     map[needle.VolumeId]*VolumeLocationList
-	writables        []needle.VolumeId        // transient array of writable volume id
-	readonlyVolumes  map[needle.VolumeId]bool // transient set of readonly volumes
-	oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
+	writables        []needle.VolumeId   // transient array of writable volume id
+	readonlyVolumes  *volumesBinaryState // readonly volumes
+	oversizedVolumes *volumesBinaryState // oversized volumes
 	volumeSizeLimit  uint64
 	replicationAsMin bool
 	accessLock       sync.RWMutex
@@ -38,8 +124,8 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
 		ttl:              ttl,
 		vid2location:     make(map[needle.VolumeId]*VolumeLocationList),
 		writables:        *new([]needle.VolumeId),
-		readonlyVolumes:  make(map[needle.VolumeId]bool),
-		oversizedVolumes: make(map[needle.VolumeId]bool),
+		readonlyVolumes:  NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
+		oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
 		volumeSizeLimit:  volumeSizeLimit,
 		replicationAsMin: replicationAsMin,
 	}
@@ -54,7 +140,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
 	defer vl.accessLock.Unlock()
 
 	defer vl.ensureCorrectWritables(v)
-	defer vl.rememberOversizedVolume(v)
+	defer vl.rememberOversizedVolume(v, dn)
 
 	if _, ok := vl.vid2location[v.Id]; !ok {
 		vl.vid2location[v.Id] = NewVolumeLocationList()
@@ -66,24 +152,26 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
 			if vInfo.ReadOnly {
 				glog.V(1).Infof("vid %d removed from writable", v.Id)
 				vl.removeFromWritable(v.Id)
-				vl.readonlyVolumes[v.Id] = true
+				vl.readonlyVolumes.Add(v.Id, dn)
 				return
 			} else {
-				delete(vl.readonlyVolumes, v.Id)
+				vl.readonlyVolumes.Remove(v.Id, dn)
 			}
 		} else {
 			glog.V(1).Infof("vid %d removed from writable", v.Id)
 			vl.removeFromWritable(v.Id)
-			delete(vl.readonlyVolumes, v.Id)
+			vl.readonlyVolumes.Remove(v.Id, dn)
 			return
 		}
 	}
 
 }
 
-func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo, dn *DataNode) {
 	if vl.isOversized(v) {
-		vl.oversizedVolumes[v.Id] = true
+		vl.oversizedVolumes.Add(v.Id, dn)
+	} else {
+		vl.oversizedVolumes.Remove(v.Id, dn)
 	}
 }
 
@@ -99,6 +187,8 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
 
 	if location.Remove(dn) {
 
+		vl.readonlyVolumes.Remove(v.Id, dn)
+		vl.oversizedVolumes.Remove(v.Id, dn)
 		vl.ensureCorrectWritables(v)
 
 		if location.Length() == 0 {
@@ -110,7 +200,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
 
 func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
 	if vl.enoughCopies(v.Id) && vl.isWritable(v) {
-		if _, ok := vl.oversizedVolumes[v.Id]; !ok {
+		if !vl.oversizedVolumes.IsTrue(v.Id) {
 			vl.setVolumeWritable(v.Id)
 		}
 	} else {
@@ -251,6 +341,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid needle.VolumeId)
 
 	if location, ok := vl.vid2location[vid]; ok {
 		if location.Remove(dn) {
+			vl.readonlyVolumes.Remove(vid, dn)
+			vl.oversizedVolumes.Remove(vid, dn)
 			if location.Length() < vl.rp.GetCopyCount() {
 				glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
 				return vl.removeFromWritable(vid)
@@ -315,7 +407,7 @@ func (vl *VolumeLayout) Stats() *VolumeLayoutStats {
 		size, fileCount := vll.Stats(vid, freshThreshold)
 		ret.FileCount += uint64(fileCount)
 		ret.UsedSize += size
-		if vl.readonlyVolumes[vid] {
+		if vl.readonlyVolumes.IsTrue(vid) {
 			ret.TotalSize += size
 		} else {
 			ret.TotalSize += vl.volumeSizeLimit

+ 116 - 0
weed/topology/volume_layout_test.go

@@ -0,0 +1,116 @@
+package topology
+
+import (
+	"testing"
+
+	"github.com/chrislusf/seaweedfs/weed/storage/needle"
+	"github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+func TestVolumesBinaryState(t *testing.T) {
+	vids := []needle.VolumeId{
+		needle.VolumeId(1),
+		needle.VolumeId(2),
+		needle.VolumeId(3),
+		needle.VolumeId(4),
+		needle.VolumeId(5),
+	}
+
+	dns := []*DataNode{
+		&DataNode{
+			Ip:   "127.0.0.1",
+			Port: 8081,
+		},
+		&DataNode{
+			Ip:   "127.0.0.1",
+			Port: 8082,
+		},
+		&DataNode{
+			Ip:   "127.0.0.1",
+			Port: 8083,
+		},
+	}
+
+	rp, _ := super_block.NewReplicaPlacementFromString("002")
+
+	state_exist := NewVolumesBinaryState(readOnlyState, rp, ExistCopies())
+	state_exist.Add(vids[0], dns[0])
+	state_exist.Add(vids[0], dns[1])
+	state_exist.Add(vids[1], dns[2])
+	state_exist.Add(vids[2], dns[1])
+	state_exist.Add(vids[4], dns[1])
+	state_exist.Add(vids[4], dns[2])
+
+	state_no := NewVolumesBinaryState(readOnlyState, rp, NoCopies())
+	state_no.Add(vids[0], dns[0])
+	state_no.Add(vids[0], dns[1])
+	state_no.Add(vids[3], dns[1])
+
+	tests := []struct {
+		name                    string
+		state                   *volumesBinaryState
+		expectResult            []bool
+		update                  func()
+		expectResultAfterUpdate []bool
+	}{
+		{
+			name:         "mark true when exist copies",
+			state:        state_exist,
+			expectResult: []bool{true, true, true, false, true},
+			update: func() {
+				state_exist.Remove(vids[0], dns[2])
+				state_exist.Remove(vids[1], dns[2])
+				state_exist.Remove(vids[3], dns[2])
+				state_exist.Remove(vids[4], dns[1])
+				state_exist.Remove(vids[4], dns[2])
+			},
+			expectResultAfterUpdate: []bool{true, false, true, false, false},
+		},
+		{
+			name:         "mark true when inexist copies",
+			state:        state_no,
+			expectResult: []bool{false, true, true, false, true},
+			update: func() {
+				state_no.Remove(vids[0], dns[2])
+				state_no.Remove(vids[1], dns[2])
+				state_no.Add(vids[2], dns[1])
+				state_no.Remove(vids[3], dns[1])
+				state_no.Remove(vids[4], dns[2])
+			},
+			expectResultAfterUpdate: []bool{false, true, false, true, true},
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			var result []bool
+			for index, _ := range vids {
+				result = append(result, test.state.IsTrue(vids[index]))
+			}
+			if len(result) != len(test.expectResult) {
+				t.Fatalf("len(result) != len(expectResult), got %d, expected %d\n",
+					len(result), len(test.expectResult))
+			}
+			for index, val := range result {
+				if val != test.expectResult[index] {
+					t.Fatalf("result not matched, index %d, got %v, expect %v\n",
+						index, val, test.expectResult[index])
+				}
+			}
+			test.update()
+			var updateResult []bool
+			for index, _ := range vids {
+				updateResult = append(updateResult, test.state.IsTrue(vids[index]))
+			}
+			if len(updateResult) != len(test.expectResultAfterUpdate) {
+				t.Fatalf("len(updateResult) != len(expectResultAfterUpdate), got %d, expected %d\n",
+					len(updateResult), len(test.expectResultAfterUpdate))
+			}
+			for index, val := range updateResult {
+				if val != test.expectResultAfterUpdate[index] {
+					t.Fatalf("update result not matched, index %d, got %v, expect %v\n",
+						index, val, test.expectResultAfterUpdate[index])
+				}
+			}
+		})
+	}
+}