123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- package topology
- import (
- "encoding/json"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "testing"
- "github.com/seaweedfs/seaweedfs/weed/sequence"
- "github.com/seaweedfs/seaweedfs/weed/storage"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- )
- var topologyLayout = `
- {
- "dc1":{
- "rack1":{
- "server111":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":3
- },
- "server112":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":10
- }
- },
- "rack2":{
- "server121":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":4
- },
- "server122":{
- "volumes":[],
- "limit":4
- },
- "server123":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":5
- }
- }
- },
- "dc2":{
- },
- "dc3":{
- "rack2":{
- "server321":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":3, "size":12312},
- {"id":5, "size":12312}
- ],
- "limit":4
- }
- }
- }
- }
- `
- func setup(topologyLayout string) *Topology {
- var data interface{}
- err := json.Unmarshal([]byte(topologyLayout), &data)
- if err != nil {
- fmt.Println("error:", err)
- }
- fmt.Println("data:", data)
- //need to connect all nodes first before server adding volumes
- topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
- mTopology := data.(map[string]interface{})
- for dcKey, dcValue := range mTopology {
- dc := NewDataCenter(dcKey)
- dcMap := dcValue.(map[string]interface{})
- topo.LinkChildNode(dc)
- for rackKey, rackValue := range dcMap {
- dcRack := NewRack(rackKey)
- rackMap := rackValue.(map[string]interface{})
- dc.LinkChildNode(dcRack)
- for serverKey, serverValue := range rackMap {
- server := NewDataNode(serverKey)
- serverMap := serverValue.(map[string]interface{})
- if ip, ok := serverMap["ip"]; ok {
- server.Ip = ip.(string)
- }
- dcRack.LinkChildNode(server)
- for _, v := range serverMap["volumes"].([]interface{}) {
- m := v.(map[string]interface{})
- vi := storage.VolumeInfo{
- Id: needle.VolumeId(int64(m["id"].(float64))),
- Size: uint64(m["size"].(float64)),
- Version: needle.CurrentVersion,
- }
- if mVal, ok := m["collection"]; ok {
- vi.Collection = mVal.(string)
- }
- if mVal, ok := m["replication"]; ok {
- rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string))
- vi.ReplicaPlacement = rp
- }
- if vi.ReplicaPlacement != nil {
- vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType)
- vl.RegisterVolume(&vi, server)
- vl.setVolumeWritable(vi.Id)
- }
- server.AddOrUpdateVolume(vi)
- }
- disk := server.getOrCreateDisk("")
- deltaDiskUsages := newDiskUsages()
- deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("")
- deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64))
- disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
- }
- }
- }
- return topo
- }
- func TestFindEmptySlotsForOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("002")
- volumeGrowOption := &VolumeGrowOption{
- Collection: "",
- ReplicaPlacement: rp,
- DataCenter: "dc1",
- Rack: "",
- DataNode: "",
- }
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
- if err != nil {
- fmt.Println("finding empty slots error :", err)
- t.Fail()
- }
- for _, server := range servers {
- fmt.Println("assigned node :", server.Id())
- }
- }
- var topologyLayout2 = `
- {
- "dc1":{
- "rack1":{
- "server111":{
- "volumes":[
- {"id":1, "size":12312},
- {"id":2, "size":12312},
- {"id":3, "size":12312}
- ],
- "limit":300
- },
- "server112":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":300
- },
- "server113":{
- "volumes":[],
- "limit":300
- },
- "server114":{
- "volumes":[],
- "limit":300
- },
- "server115":{
- "volumes":[],
- "limit":300
- },
- "server116":{
- "volumes":[],
- "limit":300
- }
- },
- "rack2":{
- "server121":{
- "volumes":[
- {"id":4, "size":12312},
- {"id":5, "size":12312},
- {"id":6, "size":12312}
- ],
- "limit":300
- },
- "server122":{
- "volumes":[],
- "limit":300
- },
- "server123":{
- "volumes":[
- {"id":2, "size":12312},
- {"id":3, "size":12312},
- {"id":4, "size":12312}
- ],
- "limit":300
- },
- "server124":{
- "volumes":[],
- "limit":300
- },
- "server125":{
- "volumes":[],
- "limit":300
- },
- "server126":{
- "volumes":[],
- "limit":300
- }
- },
- "rack3":{
- "server131":{
- "volumes":[],
- "limit":300
- },
- "server132":{
- "volumes":[],
- "limit":300
- },
- "server133":{
- "volumes":[],
- "limit":300
- },
- "server134":{
- "volumes":[],
- "limit":300
- },
- "server135":{
- "volumes":[],
- "limit":300
- },
- "server136":{
- "volumes":[],
- "limit":300
- }
- }
- }
- }
- `
- func TestReplication011(t *testing.T) {
- topo := setup(topologyLayout2)
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("011")
- volumeGrowOption := &VolumeGrowOption{
- Collection: "MAIL",
- ReplicaPlacement: rp,
- DataCenter: "dc1",
- Rack: "",
- DataNode: "",
- }
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
- if err != nil {
- fmt.Println("finding empty slots error :", err)
- t.Fail()
- }
- for _, server := range servers {
- fmt.Println("assigned node :", server.Id())
- }
- }
- var topologyLayout3 = `
- {
- "dc1":{
- "rack1":{
- "server111":{
- "volumes":[],
- "limit":2000
- }
- }
- },
- "dc2":{
- "rack2":{
- "server222":{
- "volumes":[],
- "limit":2000
- }
- }
- },
- "dc3":{
- "rack3":{
- "server333":{
- "volumes":[],
- "limit":1000
- }
- }
- },
- "dc4":{
- "rack4":{
- "server444":{
- "volumes":[],
- "limit":1000
- }
- }
- },
- "dc5":{
- "rack5":{
- "server555":{
- "volumes":[],
- "limit":500
- }
- }
- },
- "dc6":{
- "rack6":{
- "server666":{
- "volumes":[],
- "limit":500
- }
- }
- }
- }
- `
- func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
- topo := setup(topologyLayout3)
- vg := NewDefaultVolumeGrowth()
- rp, _ := super_block.NewReplicaPlacementFromString("100")
- volumeGrowOption := &VolumeGrowOption{
- Collection: "Weight",
- ReplicaPlacement: rp,
- DataCenter: "",
- Rack: "",
- DataNode: "",
- }
- distribution := map[NodeId]int{}
- // assign 1000 volumes
- for i := 0; i < 1000; i++ {
- servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
- if err != nil {
- fmt.Println("finding empty slots error :", err)
- t.Fail()
- }
- for _, server := range servers {
- // fmt.Println("assigned node :", server.Id())
- if _, ok := distribution[server.id]; !ok {
- distribution[server.id] = 0
- }
- distribution[server.id] += 1
- }
- }
- for k, v := range distribution {
- fmt.Printf("%s : %d\n", k, v)
- }
- }
- var topologyLayout4 = `
- {
- "dc1":{
- "rack1":{
- "serverdc111":{
- "ip": "127.0.0.1",
- "volumes":[
- {"id":1, "size":12312, "collection":"test", "replication":"001"},
- {"id":2, "size":12312, "collection":"test", "replication":"100"},
- {"id":4, "size":12312, "collection":"test", "replication":"100"},
- {"id":6, "size":12312, "collection":"test", "replication":"010"}
- ],
- "limit":100
- }
- }
- },
- "dc2":{
- "rack1":{
- "serverdc211":{
- "ip": "127.0.0.2",
- "volumes":[
- {"id":2, "size":12312, "collection":"test", "replication":"100"},
- {"id":3, "size":12312, "collection":"test", "replication":"010"},
- {"id":5, "size":12312, "collection":"test", "replication":"001"},
- {"id":6, "size":12312, "collection":"test", "replication":"010"}
- ],
- "limit":100
- }
- }
- },
- "dc3":{
- "rack1":{
- "serverdc311":{
- "ip": "127.0.0.3",
- "volumes":[
- {"id":1, "size":12312, "collection":"test", "replication":"001"},
- {"id":3, "size":12312, "collection":"test", "replication":"010"},
- {"id":4, "size":12312, "collection":"test", "replication":"100"},
- {"id":5, "size":12312, "collection":"test", "replication":"001"}
- ],
- "limit":100
- }
- }
- }
- }
- `
- func TestPickForWrite(t *testing.T) {
- topo := setup(topologyLayout4)
- volumeGrowOption := &VolumeGrowOption{
- Collection: "test",
- DataCenter: "",
- Rack: "",
- DataNode: "",
- }
- VolumeGrowStrategy.Threshold = 0.9
- for _, rpStr := range []string{"001", "010", "100"} {
- rp, _ := super_block.NewReplicaPlacementFromString(rpStr)
- vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType)
- volumeGrowOption.ReplicaPlacement = rp
- for _, dc := range []string{"", "dc1", "dc2", "dc3", "dc0"} {
- volumeGrowOption.DataCenter = dc
- for _, r := range []string{""} {
- volumeGrowOption.Rack = r
- for _, dn := range []string{""} {
- if dc == "" && dn != "" {
- continue
- }
- volumeGrowOption.DataNode = dn
- fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl)
- if dc == "dc0" {
- if err == nil || count != 0 || !shouldGrow {
- fmt.Println(dc, r, dn, "pick for write should be with error")
- t.Fail()
- }
- } else if err != nil {
- fmt.Println(dc, r, dn, "pick for write error :", err)
- t.Fail()
- } else if count == 0 {
- fmt.Println(dc, r, dn, "pick for write count is zero")
- t.Fail()
- } else if len(fileId) == 0 {
- fmt.Println(dc, r, dn, "pick for write file id is empty")
- t.Fail()
- } else if shouldGrow {
- fmt.Println(dc, r, dn, "pick for write error : not should grow")
- t.Fail()
- }
- }
- }
- }
- }
- }
|