market_test.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. "testing"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. var partitions = []topic.Partition{
  10. {
  11. RangeStart: 0,
  12. RangeStop: 1,
  13. RingSize: 3,
  14. UnixTimeNs: 0,
  15. },
  16. {
  17. RangeStart: 1,
  18. RangeStop: 2,
  19. RingSize: 3,
  20. UnixTimeNs: 0,
  21. },
  22. {
  23. RangeStart: 2,
  24. RangeStop: 3,
  25. RingSize: 3,
  26. UnixTimeNs: 0,
  27. },
  28. }
  29. func TestAddConsumerInstance(t *testing.T) {
  30. market := NewMarket(partitions, 10*time.Second)
  31. consumer := &ConsumerGroupInstance{
  32. InstanceId: "first",
  33. MaxPartitionCount: 2,
  34. }
  35. err := market.AddConsumerInstance(consumer)
  36. assert.Nil(t, err)
  37. time.Sleep(1 * time.Second) // Allow time for background rebalancing
  38. market.ShutdownMarket()
  39. for adjustment := range market.AdjustmentChan {
  40. fmt.Printf("%+v\n", adjustment)
  41. }
  42. }
  43. func TestMultipleConsumerInstances(t *testing.T) {
  44. market := NewMarket(partitions, 10*time.Second)
  45. market.AddConsumerInstance(&ConsumerGroupInstance{
  46. InstanceId: "first",
  47. MaxPartitionCount: 2,
  48. })
  49. market.AddConsumerInstance(&ConsumerGroupInstance{
  50. InstanceId: "second",
  51. MaxPartitionCount: 2,
  52. })
  53. market.AddConsumerInstance(&ConsumerGroupInstance{
  54. InstanceId: "third",
  55. MaxPartitionCount: 2,
  56. })
  57. time.Sleep(1 * time.Second) // Allow time for background rebalancing
  58. market.ShutdownMarket()
  59. for adjustment := range market.AdjustmentChan {
  60. fmt.Printf("%+v\n", adjustment)
  61. }
  62. }
  63. func TestConfirmAdjustment(t *testing.T) {
  64. market := NewMarket(partitions, 1*time.Second)
  65. market.AddConsumerInstance(&ConsumerGroupInstance{
  66. InstanceId: "first",
  67. MaxPartitionCount: 2,
  68. })
  69. market.AddConsumerInstance(&ConsumerGroupInstance{
  70. InstanceId: "second",
  71. MaxPartitionCount: 2,
  72. })
  73. market.AddConsumerInstance(&ConsumerGroupInstance{
  74. InstanceId: "third",
  75. MaxPartitionCount: 2,
  76. })
  77. go func() {
  78. time.Sleep(5 * time.Second) // Allow time for background rebalancing
  79. market.ShutdownMarket()
  80. }()
  81. go func() {
  82. time.Sleep(2 * time.Second)
  83. market.RemoveConsumerInstance("third")
  84. }()
  85. for adjustment := range market.AdjustmentChan {
  86. fmt.Printf("%+v\n", adjustment)
  87. market.ConfirmAdjustment(adjustment)
  88. }
  89. }