topology_test.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  4. "github.com/chrislusf/seaweedfs/weed/sequence"
  5. "github.com/chrislusf/seaweedfs/weed/storage"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "testing"
  8. )
  9. func TestRemoveDataCenter(t *testing.T) {
  10. topo := setup(topologyLayout)
  11. topo.UnlinkChildNode(NodeId("dc2"))
  12. if topo.GetActiveVolumeCount() != 15 {
  13. t.Fail()
  14. }
  15. topo.UnlinkChildNode(NodeId("dc3"))
  16. if topo.GetActiveVolumeCount() != 12 {
  17. t.Fail()
  18. }
  19. }
  20. func TestHandlingVolumeServerHeartbeat(t *testing.T) {
  21. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
  22. dc := topo.GetOrCreateDataCenter("dc1")
  23. rack := dc.GetOrCreateRack("rack1")
  24. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
  25. {
  26. volumeCount := 7
  27. var volumeMessages []*master_pb.VolumeInformationMessage
  28. for k := 1; k <= volumeCount; k++ {
  29. volumeMessage := &master_pb.VolumeInformationMessage{
  30. Id: uint32(k),
  31. Size: uint64(25432),
  32. Collection: "",
  33. FileCount: uint64(2343),
  34. DeleteCount: uint64(345),
  35. DeletedByteCount: 34524,
  36. ReadOnly: false,
  37. ReplicaPlacement: uint32(0),
  38. Version: uint32(needle.CurrentVersion),
  39. Ttl: 0,
  40. }
  41. volumeMessages = append(volumeMessages, volumeMessage)
  42. }
  43. topo.SyncDataNodeRegistration(volumeMessages, dn)
  44. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  45. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  46. }
  47. {
  48. volumeCount := 7 - 1
  49. var volumeMessages []*master_pb.VolumeInformationMessage
  50. for k := 1; k <= volumeCount; k++ {
  51. volumeMessage := &master_pb.VolumeInformationMessage{
  52. Id: uint32(k),
  53. Size: uint64(254320),
  54. Collection: "",
  55. FileCount: uint64(2343),
  56. DeleteCount: uint64(345),
  57. DeletedByteCount: 345240,
  58. ReadOnly: false,
  59. ReplicaPlacement: uint32(0),
  60. Version: uint32(needle.CurrentVersion),
  61. Ttl: 0,
  62. }
  63. volumeMessages = append(volumeMessages, volumeMessage)
  64. }
  65. topo.SyncDataNodeRegistration(volumeMessages, dn)
  66. //rp, _ := storage.NewReplicaPlacementFromString("000")
  67. //layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  68. //assert(t, "writables", len(layout.writables), volumeCount)
  69. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  70. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  71. }
  72. {
  73. volumeCount := 6
  74. newVolumeShortMessage := &master_pb.VolumeShortInformationMessage{
  75. Id: uint32(3),
  76. Collection: "",
  77. ReplicaPlacement: uint32(0),
  78. Version: uint32(needle.CurrentVersion),
  79. Ttl: 0,
  80. }
  81. topo.IncrementalSyncDataNodeRegistration(
  82. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  83. nil,
  84. dn)
  85. rp, _ := storage.NewReplicaPlacementFromString("000")
  86. layout := topo.GetVolumeLayout("", rp, needle.EMPTY_TTL)
  87. assert(t, "writables after repeated add", len(layout.writables), volumeCount)
  88. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount)
  89. assert(t, "volumeCount", int(topo.volumeCount), volumeCount)
  90. topo.IncrementalSyncDataNodeRegistration(
  91. nil,
  92. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  93. dn)
  94. assert(t, "writables after deletion", len(layout.writables), volumeCount-1)
  95. assert(t, "activeVolumeCount1", int(topo.activeVolumeCount), volumeCount-1)
  96. assert(t, "volumeCount", int(topo.volumeCount), volumeCount-1)
  97. topo.IncrementalSyncDataNodeRegistration(
  98. []*master_pb.VolumeShortInformationMessage{newVolumeShortMessage},
  99. nil,
  100. dn)
  101. for vid, _ := range layout.vid2location {
  102. println("after add volume id", vid)
  103. }
  104. for _, vid := range layout.writables {
  105. println("after add writable volume id", vid)
  106. }
  107. assert(t, "writables after add back", len(layout.writables), volumeCount)
  108. }
  109. topo.UnRegisterDataNode(dn)
  110. assert(t, "activeVolumeCount2", int(topo.activeVolumeCount), 0)
  111. }
  112. func assert(t *testing.T, message string, actual, expected int) {
  113. if actual != expected {
  114. t.Fatalf("unexpected %s: %d, expected: %d", message, actual, expected)
  115. }
  116. }
  117. func TestAddRemoveVolume(t *testing.T) {
  118. topo := NewTopology("weedfs", sequence.NewMemorySequencer(), 32*1024, 5)
  119. dc := topo.GetOrCreateDataCenter("dc1")
  120. rack := dc.GetOrCreateRack("rack1")
  121. dn := rack.GetOrCreateDataNode("127.0.0.1", 34534, "127.0.0.1", 25)
  122. v := storage.VolumeInfo{
  123. Id: needle.VolumeId(1),
  124. Size: 100,
  125. Collection: "xcollection",
  126. FileCount: 123,
  127. DeleteCount: 23,
  128. DeletedByteCount: 45,
  129. ReadOnly: false,
  130. Version: needle.CurrentVersion,
  131. ReplicaPlacement: &storage.ReplicaPlacement{},
  132. Ttl: needle.EMPTY_TTL,
  133. }
  134. dn.UpdateVolumes([]storage.VolumeInfo{v})
  135. topo.RegisterVolumeLayout(v, dn)
  136. topo.RegisterVolumeLayout(v, dn)
  137. if _, hasCollection := topo.FindCollection(v.Collection); !hasCollection {
  138. t.Errorf("collection %v should exist", v.Collection)
  139. }
  140. topo.UnRegisterVolumeLayout(v, dn)
  141. if _, hasCollection := topo.FindCollection(v.Collection); hasCollection {
  142. t.Errorf("collection %v should not exist", v.Collection)
  143. }
  144. }