topology_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  4. "github.com/chrislusf/seaweedfs/weed/sequence"
  5. "github.com/chrislusf/seaweedfs/weed/storage"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  8. "testing"
  9. )
  10. func TestRemoveDataCenter(t *testing.T) {
  11. topo := setup(topologyLayout)
  12. topo.UnlinkChildNode(NodeId("dc2"))
  13. if topo.GetActiveVolumeCount() != 15 {
  14. t.Fail()
  15. }
  16. topo.UnlinkChildNode(NodeId("dc3"))
  17. if topo.GetActiveVolumeCount() != 12 {
  18. t.Fail()
  19. }
  20. }
  21. func TestHandlingVolumeServerHeartbeat(t *testing.T) {
  22. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
  23. dc := topo.GetOrCreateDataCenter("dc1")
  24. rack := dc.GetOrCreateRack("rack1")
  25. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
  26. {
  27. volumeCount := 7
  28. var volumeMessages []*master_pb.VolumeInformationMessage
  29. for k := 1; k <= volumeCount; k++ {
  30. volumeMessage := &master_pb.VolumeInformationMessage{
  31. Id: uint32(k),
  32. Size: uint64(25432),
  33. Collection: "",
  34. FileCount: uint64(2343),
  35. DeleteCount: uint64(345),
  36. DeletedByteCount: 34524,
  37. ReadOnly: false,
  38. ReplicaPlacement: uint32(0),
  39. Version: uint32(needle.CurrentVersion),
  40. Ttl: 0,
  41. }
  42. volumeMessages = append(volumeMessages, volumeMessage)
  43. }
  44. topo.SyncDataNodeRegistration(volumeMessages, dn)
  45. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  46. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  47. }
  48. {
  49. volumeCount := 7 - 1
  50. var volumeMessages []*master_pb.VolumeInformationMessage
  51. for k := 1; k <= volumeCount; k++ {
  52. volumeMessage := &master_pb.VolumeInformationMessage{
  53. Id: uint32(k),
  54. Size: uint64(254320),
  55. Collection: "",
  56. FileCount: uint64(2343),
  57. DeleteCount: uint64(345),
  58. DeletedByteCount: 345240,
  59. ReadOnly: false,
  60. ReplicaPlacement: uint32(0),
  61. Version: uint32(needle.CurrentVersion),
  62. Ttl: 0,
  63. }
  64. volumeMessages = append(volumeMessages, volumeMessage)
  65. }
  66. topo.SyncDataNodeRegistration(volumeMessages, dn)
  67. //rp, _ := storage.NewReplicaPlacementFromString("000")
  68. //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  69. //assert(t, "writables", len(layout.writables), volumeCount)
  70. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  71. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  72. }
  73. {
  74. volumeCount := 6
  75. newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
  76. Id: uint32(3),
  77. Collection: "",
  78. ReplicaPlacement: uint32(0),
  79. Version: uint32(needle.CurrentVersion),
  80. Ttl: 0,
  81. }
  82. topo.IncrementalSyncDataNodeRegistration(
  83. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  84. nil,
  85. dn)
  86. rp, _ := super_block.NewReplicaPlacementFromString("000")
  87. layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  88. assert(t, "writables after repeated add", len(layout.writables), volumeCount)
  89. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  90. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  91. topo.IncrementalSyncDataNodeRegistration(
  92. nil,
  93. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  94. dn)
  95. assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
  96. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1)
  97. assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1)
  98. topo.IncrementalSyncDataNodeRegistration(
  99. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  100. nil,
  101. dn)
  102. for vid, _ := range layout.vid2location {
  103. println("after add volume id", vid)
  104. }
  105. for _, vid := range layout.writables {
  106. println("after add writable volume id", vid)
  107. }
  108. assert(t, "writables after add back", len(layout.writables), volumeCount)
  109. }
  110. topo.UnRegisterDataNode(dn)
  111. assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
  112. }
  113. func assert(t *testing.T, message string, actual, expected int) {
  114. if actual != expected {
  115. t.Fatalf("unexpected %s: %d, expected: %d", message, actual, expected)
  116. }
  117. }
  118. func TestAddRemoveVolume(t *testing.T) {
  119. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
  120. dc := topo.GetOrCreateDataCenter("dc1")
  121. rack := dc.GetOrCreateRack("rack1")
  122. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
  123. v := storage.VolumeInfo{
  124. Id: needle.VolumeId(1),
  125. Size: 100,
  126. Collection: "xcollection",
  127. FileCount: 123,
  128. DeleteCount: 23,
  129. DeletedByteCount: 45,
  130. ReadOnly: false,
  131. Version: needle.CurrentVersion,
  132. ReplicaPlacement: &super_block.ReplicaPlacement{},
  133. Ttl: needle.EMPTY_TTL,
  134. }
  135. dn.UpdateVolumes([]storage.VolumeInfo{v})
  136. topo.RegisterVolumeLayout(v, dn)
  137. topo.RegisterVolumeLayout(v, dn)
  138. if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
  139. t.Errorf("collection %v should exist", v.Collection)
  140. }
  141. topo.UnRegisterVolumeLayout(v, dn)
  142. if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
  143. t.Errorf("collection %v should not exist", v.Collection)
  144. }
  145. }