volume_growth_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. package topology
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. "testing"
  7. "github.com/seaweedfs/seaweedfs/weed/sequence"
  8. "github.com/seaweedfs/seaweedfs/weed/storage"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  11. )
  12. var topologyLayout = `
  13. {
  14. "dc1":{
  15. "rack1":{
  16. "server111":{
  17. "volumes":[
  18. {"id":1, "size":12312},
  19. {"id":2, "size":12312},
  20. {"id":3, "size":12312}
  21. ],
  22. "limit":3
  23. },
  24. "server112":{
  25. "volumes":[
  26. {"id":4, "size":12312},
  27. {"id":5, "size":12312},
  28. {"id":6, "size":12312}
  29. ],
  30. "limit":10
  31. }
  32. },
  33. "rack2":{
  34. "server121":{
  35. "volumes":[
  36. {"id":4, "size":12312},
  37. {"id":5, "size":12312},
  38. {"id":6, "size":12312}
  39. ],
  40. "limit":4
  41. },
  42. "server122":{
  43. "volumes":[],
  44. "limit":4
  45. },
  46. "server123":{
  47. "volumes":[
  48. {"id":2, "size":12312},
  49. {"id":3, "size":12312},
  50. {"id":4, "size":12312}
  51. ],
  52. "limit":5
  53. }
  54. }
  55. },
  56. "dc2":{
  57. },
  58. "dc3":{
  59. "rack2":{
  60. "server321":{
  61. "volumes":[
  62. {"id":1, "size":12312},
  63. {"id":3, "size":12312},
  64. {"id":5, "size":12312}
  65. ],
  66. "limit":4
  67. }
  68. }
  69. }
  70. }
  71. `
  72. func setup(topologyLayout string) *Topology {
  73. var data interface{}
  74. err := json.Unmarshal([]byte(topologyLayout), &data)
  75. if err != nil {
  76. fmt.Println("error:", err)
  77. }
  78. fmt.Println("data:", data)
  79. //need to connect all nodes first before server adding volumes
  80. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  81. mTopology := data.(map[string]interface{})
  82. for dcKey, dcValue := range mTopology {
  83. dc := NewDataCenter(dcKey)
  84. dcMap := dcValue.(map[string]interface{})
  85. topo.LinkChildNode(dc)
  86. for rackKey, rackValue := range dcMap {
  87. dcRack := NewRack(rackKey)
  88. rackMap := rackValue.(map[string]interface{})
  89. dc.LinkChildNode(dcRack)
  90. for serverKey, serverValue := range rackMap {
  91. server := NewDataNode(serverKey)
  92. serverMap := serverValue.(map[string]interface{})
  93. if ip, ok := serverMap["ip"]; ok {
  94. server.Ip = ip.(string)
  95. }
  96. dcRack.LinkChildNode(server)
  97. for _, v := range serverMap["volumes"].([]interface{}) {
  98. m := v.(map[string]interface{})
  99. vi := storage.VolumeInfo{
  100. Id: needle.VolumeId(int64(m["id"].(float64))),
  101. Size: uint64(m["size"].(float64)),
  102. Version: needle.CurrentVersion,
  103. }
  104. if mVal, ok := m["collection"]; ok {
  105. vi.Collection = mVal.(string)
  106. }
  107. if mVal, ok := m["replication"]; ok {
  108. rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string))
  109. vi.ReplicaPlacement = rp
  110. }
  111. if vi.ReplicaPlacement != nil {
  112. vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType)
  113. vl.RegisterVolume(&vi, server)
  114. vl.setVolumeWritable(vi.Id)
  115. }
  116. server.AddOrUpdateVolume(vi)
  117. }
  118. disk := server.getOrCreateDisk("")
  119. deltaDiskUsages := newDiskUsages()
  120. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk("")
  121. deltaDiskUsage.maxVolumeCount = int64(serverMap["limit"].(float64))
  122. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  123. }
  124. }
  125. }
  126. return topo
  127. }
  128. func TestFindEmptySlotsForOneVolume(t *testing.T) {
  129. topo := setup(topologyLayout)
  130. vg := NewDefaultVolumeGrowth()
  131. rp, _ := super_block.NewReplicaPlacementFromString("002")
  132. volumeGrowOption := &VolumeGrowOption{
  133. Collection: "",
  134. ReplicaPlacement: rp,
  135. DataCenter: "dc1",
  136. Rack: "",
  137. DataNode: "",
  138. }
  139. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  140. if err != nil {
  141. fmt.Println("finding empty slots error :", err)
  142. t.Fail()
  143. }
  144. for _, server := range servers {
  145. fmt.Println("assigned node :", server.Id())
  146. }
  147. }
  148. var topologyLayout2 = `
  149. {
  150. "dc1":{
  151. "rack1":{
  152. "server111":{
  153. "volumes":[
  154. {"id":1, "size":12312},
  155. {"id":2, "size":12312},
  156. {"id":3, "size":12312}
  157. ],
  158. "limit":300
  159. },
  160. "server112":{
  161. "volumes":[
  162. {"id":4, "size":12312},
  163. {"id":5, "size":12312},
  164. {"id":6, "size":12312}
  165. ],
  166. "limit":300
  167. },
  168. "server113":{
  169. "volumes":[],
  170. "limit":300
  171. },
  172. "server114":{
  173. "volumes":[],
  174. "limit":300
  175. },
  176. "server115":{
  177. "volumes":[],
  178. "limit":300
  179. },
  180. "server116":{
  181. "volumes":[],
  182. "limit":300
  183. }
  184. },
  185. "rack2":{
  186. "server121":{
  187. "volumes":[
  188. {"id":4, "size":12312},
  189. {"id":5, "size":12312},
  190. {"id":6, "size":12312}
  191. ],
  192. "limit":300
  193. },
  194. "server122":{
  195. "volumes":[],
  196. "limit":300
  197. },
  198. "server123":{
  199. "volumes":[
  200. {"id":2, "size":12312},
  201. {"id":3, "size":12312},
  202. {"id":4, "size":12312}
  203. ],
  204. "limit":300
  205. },
  206. "server124":{
  207. "volumes":[],
  208. "limit":300
  209. },
  210. "server125":{
  211. "volumes":[],
  212. "limit":300
  213. },
  214. "server126":{
  215. "volumes":[],
  216. "limit":300
  217. }
  218. },
  219. "rack3":{
  220. "server131":{
  221. "volumes":[],
  222. "limit":300
  223. },
  224. "server132":{
  225. "volumes":[],
  226. "limit":300
  227. },
  228. "server133":{
  229. "volumes":[],
  230. "limit":300
  231. },
  232. "server134":{
  233. "volumes":[],
  234. "limit":300
  235. },
  236. "server135":{
  237. "volumes":[],
  238. "limit":300
  239. },
  240. "server136":{
  241. "volumes":[],
  242. "limit":300
  243. }
  244. }
  245. }
  246. }
  247. `
  248. func TestReplication011(t *testing.T) {
  249. topo := setup(topologyLayout2)
  250. vg := NewDefaultVolumeGrowth()
  251. rp, _ := super_block.NewReplicaPlacementFromString("011")
  252. volumeGrowOption := &VolumeGrowOption{
  253. Collection: "MAIL",
  254. ReplicaPlacement: rp,
  255. DataCenter: "dc1",
  256. Rack: "",
  257. DataNode: "",
  258. }
  259. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  260. if err != nil {
  261. fmt.Println("finding empty slots error :", err)
  262. t.Fail()
  263. }
  264. for _, server := range servers {
  265. fmt.Println("assigned node :", server.Id())
  266. }
  267. }
  268. var topologyLayout3 = `
  269. {
  270. "dc1":{
  271. "rack1":{
  272. "server111":{
  273. "volumes":[],
  274. "limit":2000
  275. }
  276. }
  277. },
  278. "dc2":{
  279. "rack2":{
  280. "server222":{
  281. "volumes":[],
  282. "limit":2000
  283. }
  284. }
  285. },
  286. "dc3":{
  287. "rack3":{
  288. "server333":{
  289. "volumes":[],
  290. "limit":1000
  291. }
  292. }
  293. },
  294. "dc4":{
  295. "rack4":{
  296. "server444":{
  297. "volumes":[],
  298. "limit":1000
  299. }
  300. }
  301. },
  302. "dc5":{
  303. "rack5":{
  304. "server555":{
  305. "volumes":[],
  306. "limit":500
  307. }
  308. }
  309. },
  310. "dc6":{
  311. "rack6":{
  312. "server666":{
  313. "volumes":[],
  314. "limit":500
  315. }
  316. }
  317. }
  318. }
  319. `
  320. func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
  321. topo := setup(topologyLayout3)
  322. vg := NewDefaultVolumeGrowth()
  323. rp, _ := super_block.NewReplicaPlacementFromString("100")
  324. volumeGrowOption := &VolumeGrowOption{
  325. Collection: "Weight",
  326. ReplicaPlacement: rp,
  327. DataCenter: "",
  328. Rack: "",
  329. DataNode: "",
  330. }
  331. distribution := map[NodeId]int{}
  332. // assign 1000 volumes
  333. for i := 0; i < 1000; i++ {
  334. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  335. if err != nil {
  336. fmt.Println("finding empty slots error :", err)
  337. t.Fail()
  338. }
  339. for _, server := range servers {
  340. // fmt.Println("assigned node :", server.Id())
  341. if _, ok := distribution[server.id]; !ok {
  342. distribution[server.id] = 0
  343. }
  344. distribution[server.id] += 1
  345. }
  346. }
  347. for k, v := range distribution {
  348. fmt.Printf("%s : %d\n", k, v)
  349. }
  350. }
  351. var topologyLayout4 = `
  352. {
  353. "dc1":{
  354. "rack1":{
  355. "serverdc111":{
  356. "ip": "127.0.0.1",
  357. "volumes":[
  358. {"id":1, "size":12312, "collection":"test", "replication":"001"},
  359. {"id":2, "size":12312, "collection":"test", "replication":"100"},
  360. {"id":4, "size":12312, "collection":"test", "replication":"100"},
  361. {"id":6, "size":12312, "collection":"test", "replication":"010"}
  362. ],
  363. "limit":100
  364. }
  365. }
  366. },
  367. "dc2":{
  368. "rack1":{
  369. "serverdc211":{
  370. "ip": "127.0.0.2",
  371. "volumes":[
  372. {"id":2, "size":12312, "collection":"test", "replication":"100"},
  373. {"id":3, "size":12312, "collection":"test", "replication":"010"},
  374. {"id":5, "size":12312, "collection":"test", "replication":"001"},
  375. {"id":6, "size":12312, "collection":"test", "replication":"010"}
  376. ],
  377. "limit":100
  378. }
  379. }
  380. },
  381. "dc3":{
  382. "rack1":{
  383. "serverdc311":{
  384. "ip": "127.0.0.3",
  385. "volumes":[
  386. {"id":1, "size":12312, "collection":"test", "replication":"001"},
  387. {"id":3, "size":12312, "collection":"test", "replication":"010"},
  388. {"id":4, "size":12312, "collection":"test", "replication":"100"},
  389. {"id":5, "size":12312, "collection":"test", "replication":"001"}
  390. ],
  391. "limit":100
  392. }
  393. }
  394. }
  395. }
  396. `
  397. func TestPickForWrite(t *testing.T) {
  398. topo := setup(topologyLayout4)
  399. volumeGrowOption := &VolumeGrowOption{
  400. Collection: "test",
  401. DataCenter: "",
  402. Rack: "",
  403. DataNode: "",
  404. }
  405. VolumeGrowStrategy.Threshold = 0.9
  406. for _, rpStr := range []string{"001", "010", "100"} {
  407. rp, _ := super_block.NewReplicaPlacementFromString(rpStr)
  408. vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType)
  409. volumeGrowOption.ReplicaPlacement = rp
  410. for _, dc := range []string{"", "dc1", "dc2", "dc3", "dc0"} {
  411. volumeGrowOption.DataCenter = dc
  412. for _, r := range []string{""} {
  413. volumeGrowOption.Rack = r
  414. for _, dn := range []string{""} {
  415. if dc == "" && dn != "" {
  416. continue
  417. }
  418. volumeGrowOption.DataNode = dn
  419. fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl)
  420. if dc == "dc0" {
  421. if err == nil || count != 0 || !shouldGrow {
  422. fmt.Println(dc, r, dn, "pick for write should be with error")
  423. t.Fail()
  424. }
  425. } else if err != nil {
  426. fmt.Println(dc, r, dn, "pick for write error :", err)
  427. t.Fail()
  428. } else if count == 0 {
  429. fmt.Println(dc, r, dn, "pick for write count is zero")
  430. t.Fail()
  431. } else if len(fileId) == 0 {
  432. fmt.Println(dc, r, dn, "pick for write file id is empty")
  433. t.Fail()
  434. } else if shouldGrow {
  435. fmt.Println(dc, r, dn, "pick for write error : not should grow")
  436. t.Fail()
  437. }
  438. }
  439. }
  440. }
  441. }
  442. }