topology_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  4. "github.com/chrislusf/seaweedfs/weed/sequence"
  5. "github.com/chrislusf/seaweedfs/weed/storage"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "testing"
  10. )
  11. func TestRemoveDataCenter(t *testing.T) {
  12. topo := setup(topologyLayout)
  13. topo.UnlinkChildNode(NodeId("dc2"))
  14. if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 15 {
  15. t.Fail()
  16. }
  17. topo.UnlinkChildNode(NodeId("dc3"))
  18. if topo.diskUsages.usages[types.HardDriveType].activeVolumeCount != 12 {
  19. t.Fail()
  20. }
  21. }
  22. func TestHandlingVolumeServerHeartbeat(t *testing.T) {
  23. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  24. dc := topo.GetOrCreateDataCenter("dc1")
  25. rack := dc.GetOrCreateRack("rack1")
  26. maxVolumeCounts := make(map[string]uint32)
  27. maxVolumeCounts[""] = 25
  28. maxVolumeCounts["ssd"] = 12
  29. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts)
  30. {
  31. volumeCount := 7
  32. var volumeMessages []*master_pb.VolumeInformationMessage
  33. for k := 1; k <= volumeCount; k++ {
  34. volumeMessage := &master_pb.VolumeInformationMessage{
  35. Id: uint32(k),
  36. Size: uint64(25432),
  37. Collection: "",
  38. FileCount: uint64(2343),
  39. DeleteCount: uint64(345),
  40. DeletedByteCount: 34524,
  41. ReadOnly: false,
  42. ReplicaPlacement: uint32(0),
  43. Version: uint32(needle.CurrentVersion),
  44. Ttl: 0,
  45. }
  46. volumeMessages = append(volumeMessages, volumeMessage)
  47. }
  48. for k := 1; k <= volumeCount; k++ {
  49. volumeMessage := &master_pb.VolumeInformationMessage{
  50. Id: uint32(volumeCount + k),
  51. Size: uint64(25432),
  52. Collection: "",
  53. FileCount: uint64(2343),
  54. DeleteCount: uint64(345),
  55. DeletedByteCount: 34524,
  56. ReadOnly: false,
  57. ReplicaPlacement: uint32(0),
  58. Version: uint32(needle.CurrentVersion),
  59. Ttl: 0,
  60. DiskType: "ssd",
  61. }
  62. volumeMessages = append(volumeMessages, volumeMessage)
  63. }
  64. topo.SyncDataNodeRegistration(volumeMessages, dn)
  65. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  66. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
  67. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
  68. assert(t, "ssdVolumeCount", int(topo.diskUsages.usages[types.SsdType].volumeCount), volumeCount)
  69. }
  70. {
  71. volumeCount := 7 - 1
  72. var volumeMessages []*master_pb.VolumeInformationMessage
  73. for k := 1; k <= volumeCount; k++ {
  74. volumeMessage := &master_pb.VolumeInformationMessage{
  75. Id: uint32(k),
  76. Size: uint64(254320),
  77. Collection: "",
  78. FileCount: uint64(2343),
  79. DeleteCount: uint64(345),
  80. DeletedByteCount: 345240,
  81. ReadOnly: false,
  82. ReplicaPlacement: uint32(0),
  83. Version: uint32(needle.CurrentVersion),
  84. Ttl: 0,
  85. }
  86. volumeMessages = append(volumeMessages, volumeMessage)
  87. }
  88. topo.SyncDataNodeRegistration(volumeMessages, dn)
  89. //rp, _ := storage.NewReplicaPlacementFromString("000")
  90. //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  91. //assert(t, "writables", len(layout.writables), volumeCount)
  92. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  93. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
  94. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
  95. }
  96. {
  97. volumeCount := 6
  98. newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
  99. Id: uint32(3),
  100. Collection: "",
  101. ReplicaPlacement: uint32(0),
  102. Version: uint32(needle.CurrentVersion),
  103. Ttl: 0,
  104. }
  105. topo.IncrementalSyncDataNodeRegistration(
  106. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  107. nil,
  108. dn)
  109. rp, _ := super_block.NewReplicaPlacementFromString("000")
  110. layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL, types.HardDriveType)
  111. assert(t, "writables after repeated add", len(layout.writables), volumeCount)
  112. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  113. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount)
  114. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount)
  115. topo.IncrementalSyncDataNodeRegistration(
  116. nil,
  117. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  118. dn)
  119. assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
  120. assert(t, "activeVolumeCount1", int(usageCounts.activeVolumeCount), volumeCount-1)
  121. assert(t, "volumeCount", int(usageCounts.volumeCount), volumeCount-1)
  122. topo.IncrementalSyncDataNodeRegistration(
  123. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  124. nil,
  125. dn)
  126. for vid := range layout.vid2location {
  127. println("after add volume id", vid)
  128. }
  129. for _, vid := range layout.writables {
  130. println("after add writable volume id", vid)
  131. }
  132. assert(t, "writables after add back", len(layout.writables), volumeCount)
  133. }
  134. topo.UnRegisterDataNode(dn)
  135. usageCounts := topo.diskUsages.usages[types.HardDriveType]
  136. assert(t, "activeVolumeCount2", int(usageCounts.activeVolumeCount), 0)
  137. }
  138. func assert(t *testing.T, message string, actual, expected int) {
  139. if actual != expected {
  140. t.Fatalf("unexpected %s: %d, expected: %d", message, actual, expected)
  141. }
  142. }
  143. func TestAddRemoveVolume(t *testing.T) {
  144. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5, false)
  145. dc := topo.GetOrCreateDataCenter("dc1")
  146. rack := dc.GetOrCreateRack("rack1")
  147. maxVolumeCounts := make(map[string]uint32)
  148. maxVolumeCounts[""] = 25
  149. maxVolumeCounts["ssd"] = 12
  150. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", maxVolumeCounts)
  151. v := storage.VolumeInfo{
  152. Id: needle.VolumeId(1),
  153. Size: 100,
  154. Collection: "xcollection",
  155. DiskType: "ssd",
  156. FileCount: 123,
  157. DeleteCount: 23,
  158. DeletedByteCount: 45,
  159. ReadOnly: false,
  160. Version: needle.CurrentVersion,
  161. ReplicaPlacement: &super_block.ReplicaPlacement{},
  162. Ttl: needle.EMPTY_TTL,
  163. }
  164. dn.UpdateVolumes([]storage.VolumeInfo{v})
  165. topo.RegisterVolumeLayout(v, dn)
  166. topo.RegisterVolumeLayout(v, dn)
  167. if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
  168. t.Errorf("collection %v should exist", v.Collection)
  169. }
  170. topo.UnRegisterVolumeLayout(v, dn)
  171. if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
  172. t.Errorf("collection %v should not exist", v.Collection)
  173. }
  174. }