cluster_test.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package cluster
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/stretchr/testify/assert"
  5. "strconv"
  6. "sync"
  7. "testing"
  8. )
  9. func TestClusterAddRemoveNodes(t *testing.T) {
  10. c := NewCluster()
  11. c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:1"), "23.45")
  12. c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:2"), "23.45")
  13. assert.Equal(t, []pb.ServerAddress{
  14. pb.ServerAddress("111:1"),
  15. pb.ServerAddress("111:2"),
  16. }, c.getGroupMembers("", "filer", true).leaders.GetLeaders())
  17. c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:3"), "23.45")
  18. c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:4"), "23.45")
  19. assert.Equal(t, []pb.ServerAddress{
  20. pb.ServerAddress("111:1"),
  21. pb.ServerAddress("111:2"),
  22. pb.ServerAddress("111:3"),
  23. }, c.getGroupMembers("", "filer", true).leaders.GetLeaders())
  24. c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:5"), "23.45")
  25. c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:6"), "23.45")
  26. c.RemoveClusterNode("", "filer", pb.ServerAddress("111:4"))
  27. assert.Equal(t, []pb.ServerAddress{
  28. pb.ServerAddress("111:1"),
  29. pb.ServerAddress("111:2"),
  30. pb.ServerAddress("111:3"),
  31. }, c.getGroupMembers("", "filer", true).leaders.GetLeaders())
  32. // remove oldest
  33. c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
  34. assert.Equal(t, []pb.ServerAddress{
  35. pb.ServerAddress("111:6"),
  36. pb.ServerAddress("111:2"),
  37. pb.ServerAddress("111:3"),
  38. }, c.getGroupMembers("", "filer", true).leaders.GetLeaders())
  39. // remove oldest
  40. c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1"))
  41. }
  42. func TestConcurrentAddRemoveNodes(t *testing.T) {
  43. c := NewCluster()
  44. var wg sync.WaitGroup
  45. for i := 0; i < 50; i++ {
  46. wg.Add(1)
  47. go func(i int) {
  48. defer wg.Done()
  49. address := strconv.Itoa(i)
  50. c.AddClusterNode("", "filer", "", "", pb.ServerAddress(address), "23.45")
  51. }(i)
  52. }
  53. wg.Wait()
  54. for i := 0; i < 50; i++ {
  55. wg.Add(1)
  56. go func(i int) {
  57. defer wg.Done()
  58. address := strconv.Itoa(i)
  59. node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address))
  60. if len(node) == 0 {
  61. t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address)
  62. return
  63. } else if node[0].ClusterNodeUpdate.Address != address {
  64. t.Errorf("TestConcurrentAddRemoveNodes: expect:%s, actual:%s", address, node[0].ClusterNodeUpdate.Address)
  65. return
  66. }
  67. }(i)
  68. }
  69. wg.Wait()
  70. }