volume_growth_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. package topology
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. "testing"
  7. "github.com/seaweedfs/seaweedfs/weed/sequence"
  8. "github.com/seaweedfs/seaweedfs/weed/storage"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  11. )
  12. var topologyLayout = `
  13. {
  14. "dc1":{
  15. "rack1":{
  16. "server111":{
  17. "volumes":[
  18. {"id":1, "size":12312},
  19. {"id":2, "size":12312},
  20. {"id":3, "size":12312}
  21. ],
  22. "limit":3
  23. },
  24. "server112":{
  25. "volumes":[
  26. {"id":4, "size":12312},
  27. {"id":5, "size":12312},
  28. {"id":6, "size":12312}
  29. ],
  30. "limit":10
  31. }
  32. },
  33. "rack2":{
  34. "server121":{
  35. "volumes":[
  36. {"id":4, "size":12312},
  37. {"id":5, "size":12312},
  38. {"id":6, "size":12312}
  39. ],
  40. "limit":4
  41. },
  42. "server122":{
  43. "volumes":[],
  44. "limit":4
  45. },
  46. "server123":{
  47. "volumes":[
  48. {"id":2, "size":12312},
  49. {"id":3, "size":12312},
  50. {"id":4, "size":12312}
  51. ],
  52. "limit":5
  53. }
  54. }
  55. },
  56. "dc2":{
  57. },
  58. "dc3":{
  59. "rack2":{
  60. "server321":{
  61. "volumes":[
  62. {"id":1, "size":12312},
  63. {"id":3, "size":12312},
  64. {"id":5, "size":12312}
  65. ],
  66. "limit":4
  67. }
  68. }
  69. }
  70. }
  71. `
  72. func setup(topologyLayout string) *Topology {
  73. var data interface{}
  74. err := json.Unmarshal([]byte(topologyLayout), &data)
  75. if err != nil {
  76. fmt.Println("error:", err)
  77. }
  78. fmt.Println("data:", data)
  79. //need to connect all nodes first before server adding volumes
  80. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  81. mTopology := data.(map[string]interface{})
  82. for dcKey, dcValue := range mTopology {
  83. dc := NewDataCenter(dcKey)
  84. dcMap := dcValue.(map[string]interface{})
  85. topo.LinkChildNode(dc)
  86. for rackKey, rackValue := range dcMap {
  87. dcRack := NewRack(rackKey)
  88. rackMap := rackValue.(map[string]interface{})
  89. dc.LinkChildNode(dcRack)
  90. for serverKey, serverValue := range rackMap {
  91. server := NewDataNode(serverKey)
  92. serverMap := serverValue.(map[string]interface{})
  93. if ip, ok := serverMap["ip"]; ok {
  94. server.Ip = ip.(string)
  95. }
  96. dcRack.LinkChildNode(server)
  97. for _, v := range serverMap["volumes"].([]interface{}) {
  98. m := v.(map[string]interface{})
  99. vi := storage.VolumeInfo{
  100. Id: needle.VolumeId(int64(m["id"].(float64))),
  101. Size: uint64(m["size"].(float64)),
  102. Version: needle.CurrentVersion,
  103. }
  104. if mVal, ok := m["collection"]; ok {
  105. vi.Collection = mVal.(string)
  106. }
  107. if mVal, ok := m["replication"]; ok {
  108. rp, _ := super_block.NewReplicaPlacementFromString(mVal.(string))
  109. vi.ReplicaPlacement = rp
  110. }
  111. if vi.ReplicaPlacement != nil {
  112. vl := topo.GetVolumeLayout(vi.Collection, vi.ReplicaPlacement, needle.EMPTY_TTL, types.HardDriveType)
  113. vl.RegisterVolume(&vi, server)
  114. vl.setVolumeWritable(vi.Id)
  115. }
  116. server.AddOrUpdateVolume(vi)
  117. }
  118. disk := server.getOrCreateDisk("")
  119. disk.UpAdjustDiskUsageDelta("", &DiskUsageCounts{
  120. maxVolumeCount: int64(serverMap["limit"].(float64)),
  121. })
  122. }
  123. }
  124. }
  125. return topo
  126. }
  127. func TestFindEmptySlotsForOneVolume(t *testing.T) {
  128. topo := setup(topologyLayout)
  129. vg := NewDefaultVolumeGrowth()
  130. rp, _ := super_block.NewReplicaPlacementFromString("002")
  131. volumeGrowOption := &VolumeGrowOption{
  132. Collection: "",
  133. ReplicaPlacement: rp,
  134. DataCenter: "dc1",
  135. Rack: "",
  136. DataNode: "",
  137. }
  138. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  139. if err != nil {
  140. fmt.Println("finding empty slots error :", err)
  141. t.Fail()
  142. }
  143. for _, server := range servers {
  144. fmt.Println("assigned node :", server.Id())
  145. }
  146. }
  147. var topologyLayout2 = `
  148. {
  149. "dc1":{
  150. "rack1":{
  151. "server111":{
  152. "volumes":[
  153. {"id":1, "size":12312},
  154. {"id":2, "size":12312},
  155. {"id":3, "size":12312}
  156. ],
  157. "limit":300
  158. },
  159. "server112":{
  160. "volumes":[
  161. {"id":4, "size":12312},
  162. {"id":5, "size":12312},
  163. {"id":6, "size":12312}
  164. ],
  165. "limit":300
  166. },
  167. "server113":{
  168. "volumes":[],
  169. "limit":300
  170. },
  171. "server114":{
  172. "volumes":[],
  173. "limit":300
  174. },
  175. "server115":{
  176. "volumes":[],
  177. "limit":300
  178. },
  179. "server116":{
  180. "volumes":[],
  181. "limit":300
  182. }
  183. },
  184. "rack2":{
  185. "server121":{
  186. "volumes":[
  187. {"id":4, "size":12312},
  188. {"id":5, "size":12312},
  189. {"id":6, "size":12312}
  190. ],
  191. "limit":300
  192. },
  193. "server122":{
  194. "volumes":[],
  195. "limit":300
  196. },
  197. "server123":{
  198. "volumes":[
  199. {"id":2, "size":12312},
  200. {"id":3, "size":12312},
  201. {"id":4, "size":12312}
  202. ],
  203. "limit":300
  204. },
  205. "server124":{
  206. "volumes":[],
  207. "limit":300
  208. },
  209. "server125":{
  210. "volumes":[],
  211. "limit":300
  212. },
  213. "server126":{
  214. "volumes":[],
  215. "limit":300
  216. }
  217. },
  218. "rack3":{
  219. "server131":{
  220. "volumes":[],
  221. "limit":300
  222. },
  223. "server132":{
  224. "volumes":[],
  225. "limit":300
  226. },
  227. "server133":{
  228. "volumes":[],
  229. "limit":300
  230. },
  231. "server134":{
  232. "volumes":[],
  233. "limit":300
  234. },
  235. "server135":{
  236. "volumes":[],
  237. "limit":300
  238. },
  239. "server136":{
  240. "volumes":[],
  241. "limit":300
  242. }
  243. }
  244. }
  245. }
  246. `
  247. func TestReplication011(t *testing.T) {
  248. topo := setup(topologyLayout2)
  249. vg := NewDefaultVolumeGrowth()
  250. rp, _ := super_block.NewReplicaPlacementFromString("011")
  251. volumeGrowOption := &VolumeGrowOption{
  252. Collection: "MAIL",
  253. ReplicaPlacement: rp,
  254. DataCenter: "dc1",
  255. Rack: "",
  256. DataNode: "",
  257. }
  258. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  259. if err != nil {
  260. fmt.Println("finding empty slots error :", err)
  261. t.Fail()
  262. }
  263. for _, server := range servers {
  264. fmt.Println("assigned node :", server.Id())
  265. }
  266. }
  267. var topologyLayout3 = `
  268. {
  269. "dc1":{
  270. "rack1":{
  271. "server111":{
  272. "volumes":[],
  273. "limit":2000
  274. }
  275. }
  276. },
  277. "dc2":{
  278. "rack2":{
  279. "server222":{
  280. "volumes":[],
  281. "limit":2000
  282. }
  283. }
  284. },
  285. "dc3":{
  286. "rack3":{
  287. "server333":{
  288. "volumes":[],
  289. "limit":1000
  290. }
  291. }
  292. },
  293. "dc4":{
  294. "rack4":{
  295. "server444":{
  296. "volumes":[],
  297. "limit":1000
  298. }
  299. }
  300. },
  301. "dc5":{
  302. "rack5":{
  303. "server555":{
  304. "volumes":[],
  305. "limit":500
  306. }
  307. }
  308. },
  309. "dc6":{
  310. "rack6":{
  311. "server666":{
  312. "volumes":[],
  313. "limit":500
  314. }
  315. }
  316. }
  317. }
  318. `
  319. func TestFindEmptySlotsForOneVolumeScheduleByWeight(t *testing.T) {
  320. topo := setup(topologyLayout3)
  321. vg := NewDefaultVolumeGrowth()
  322. rp, _ := super_block.NewReplicaPlacementFromString("100")
  323. volumeGrowOption := &VolumeGrowOption{
  324. Collection: "Weight",
  325. ReplicaPlacement: rp,
  326. DataCenter: "",
  327. Rack: "",
  328. DataNode: "",
  329. }
  330. distribution := map[NodeId]int{}
  331. // assign 1000 volumes
  332. for i := 0; i < 1000; i++ {
  333. servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
  334. if err != nil {
  335. fmt.Println("finding empty slots error :", err)
  336. t.Fail()
  337. }
  338. for _, server := range servers {
  339. // fmt.Println("assigned node :", server.Id())
  340. if _, ok := distribution[server.id]; !ok {
  341. distribution[server.id] = 0
  342. }
  343. distribution[server.id] += 1
  344. }
  345. }
  346. for k, v := range distribution {
  347. fmt.Printf("%s : %d\n", k, v)
  348. }
  349. }
  350. var topologyLayout4 = `
  351. {
  352. "dc1":{
  353. "rack1":{
  354. "serverdc111":{
  355. "ip": "127.0.0.1",
  356. "volumes":[
  357. {"id":1, "size":12312, "collection":"test", "replication":"001"},
  358. {"id":2, "size":12312, "collection":"test", "replication":"100"},
  359. {"id":4, "size":12312, "collection":"test", "replication":"100"},
  360. {"id":6, "size":12312, "collection":"test", "replication":"010"}
  361. ],
  362. "limit":100
  363. }
  364. }
  365. },
  366. "dc2":{
  367. "rack1":{
  368. "serverdc211":{
  369. "ip": "127.0.0.2",
  370. "volumes":[
  371. {"id":2, "size":12312, "collection":"test", "replication":"100"},
  372. {"id":3, "size":12312, "collection":"test", "replication":"010"},
  373. {"id":5, "size":12312, "collection":"test", "replication":"001"},
  374. {"id":6, "size":12312, "collection":"test", "replication":"010"}
  375. ],
  376. "limit":100
  377. }
  378. }
  379. },
  380. "dc3":{
  381. "rack1":{
  382. "serverdc311":{
  383. "ip": "127.0.0.3",
  384. "volumes":[
  385. {"id":1, "size":12312, "collection":"test", "replication":"001"},
  386. {"id":3, "size":12312, "collection":"test", "replication":"010"},
  387. {"id":4, "size":12312, "collection":"test", "replication":"100"},
  388. {"id":5, "size":12312, "collection":"test", "replication":"001"}
  389. ],
  390. "limit":100
  391. }
  392. }
  393. }
  394. }
  395. `
  396. func TestPickForWrite(t *testing.T) {
  397. topo := setup(topologyLayout4)
  398. volumeGrowOption := &VolumeGrowOption{
  399. Collection: "test",
  400. DataCenter: "",
  401. Rack: "",
  402. DataNode: "",
  403. }
  404. VolumeGrowStrategy.Threshold = 0.9
  405. for _, rpStr := range []string{"001", "010", "100"} {
  406. rp, _ := super_block.NewReplicaPlacementFromString(rpStr)
  407. vl := topo.GetVolumeLayout("test", rp, needle.EMPTY_TTL, types.HardDriveType)
  408. volumeGrowOption.ReplicaPlacement = rp
  409. for _, dc := range []string{"", "dc1", "dc2", "dc3", "dc0"} {
  410. volumeGrowOption.DataCenter = dc
  411. for _, r := range []string{""} {
  412. volumeGrowOption.Rack = r
  413. for _, dn := range []string{""} {
  414. if dc == "" && dn != "" {
  415. continue
  416. }
  417. volumeGrowOption.DataNode = dn
  418. fileId, count, _, shouldGrow, err := topo.PickForWrite(1, volumeGrowOption, vl)
  419. if dc == "dc0" {
  420. if err == nil || count != 0 || !shouldGrow {
  421. fmt.Println(dc, r, dn, "pick for write should be with error")
  422. t.Fail()
  423. }
  424. } else if err != nil {
  425. fmt.Println(dc, r, dn, "pick for write error :", err)
  426. t.Fail()
  427. } else if count == 0 {
  428. fmt.Println(dc, r, dn, "pick for write count is zero")
  429. t.Fail()
  430. } else if len(fileId) == 0 {
  431. fmt.Println(dc, r, dn, "pick for write file id is empty")
  432. t.Fail()
  433. } else if shouldGrow {
  434. fmt.Println(dc, r, dn, "pick for write error : not should grow")
  435. t.Fail()
  436. }
  437. }
  438. }
  439. }
  440. }
  441. }