partition_consumer_mapping_test.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. package sub_coordinator
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  4. "reflect"
  5. "testing"
  6. )
  7. func Test_doBalanceSticky(t *testing.T) {
  8. type args struct {
  9. partitions []*pub_balancer.PartitionSlotToBroker
  10. consumerInstanceIds []string
  11. prevMapping *PartitionSlotToConsumerInstanceList
  12. }
  13. tests := []struct {
  14. name string
  15. args args
  16. wantPartitionSlots []*PartitionSlotToConsumerInstance
  17. }{
  18. {
  19. name: "1 consumer instance, 1 partition",
  20. args: args{
  21. partitions: []*pub_balancer.PartitionSlotToBroker{
  22. {
  23. RangeStart: 0,
  24. RangeStop: 100,
  25. },
  26. },
  27. consumerInstanceIds: []string{"consumer-instance-1"},
  28. prevMapping: nil,
  29. },
  30. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  31. {
  32. RangeStart: 0,
  33. RangeStop: 100,
  34. AssignedInstanceId: "consumer-instance-1",
  35. },
  36. },
  37. },
  38. {
  39. name: "2 consumer instances, 1 partition",
  40. args: args{
  41. partitions: []*pub_balancer.PartitionSlotToBroker{
  42. {
  43. RangeStart: 0,
  44. RangeStop: 100,
  45. },
  46. },
  47. consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
  48. prevMapping: nil,
  49. },
  50. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  51. {
  52. RangeStart: 0,
  53. RangeStop: 100,
  54. AssignedInstanceId: "consumer-instance-1",
  55. },
  56. },
  57. },
  58. {
  59. name: "1 consumer instance, 2 partitions",
  60. args: args{
  61. partitions: []*pub_balancer.PartitionSlotToBroker{
  62. {
  63. RangeStart: 0,
  64. RangeStop: 50,
  65. },
  66. {
  67. RangeStart: 50,
  68. RangeStop: 100,
  69. },
  70. },
  71. consumerInstanceIds: []string{"consumer-instance-1"},
  72. prevMapping: nil,
  73. },
  74. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  75. {
  76. RangeStart: 0,
  77. RangeStop: 50,
  78. AssignedInstanceId: "consumer-instance-1",
  79. },
  80. {
  81. RangeStart: 50,
  82. RangeStop: 100,
  83. AssignedInstanceId: "consumer-instance-1",
  84. },
  85. },
  86. },
  87. {
  88. name: "2 consumer instances, 2 partitions",
  89. args: args{
  90. partitions: []*pub_balancer.PartitionSlotToBroker{
  91. {
  92. RangeStart: 0,
  93. RangeStop: 50,
  94. },
  95. {
  96. RangeStart: 50,
  97. RangeStop: 100,
  98. },
  99. },
  100. consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
  101. prevMapping: nil,
  102. },
  103. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  104. {
  105. RangeStart: 0,
  106. RangeStop: 50,
  107. AssignedInstanceId: "consumer-instance-1",
  108. },
  109. {
  110. RangeStart: 50,
  111. RangeStop: 100,
  112. AssignedInstanceId: "consumer-instance-2",
  113. },
  114. },
  115. },
  116. {
  117. name: "2 consumer instances, 2 partitions, 1 deleted consumer instance",
  118. args: args{
  119. partitions: []*pub_balancer.PartitionSlotToBroker{
  120. {
  121. RangeStart: 0,
  122. RangeStop: 50,
  123. },
  124. {
  125. RangeStart: 50,
  126. RangeStop: 100,
  127. },
  128. },
  129. consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
  130. prevMapping: &PartitionSlotToConsumerInstanceList{
  131. PartitionSlots: []*PartitionSlotToConsumerInstance{
  132. {
  133. RangeStart: 0,
  134. RangeStop: 50,
  135. AssignedInstanceId: "consumer-instance-3",
  136. },
  137. {
  138. RangeStart: 50,
  139. RangeStop: 100,
  140. AssignedInstanceId: "consumer-instance-2",
  141. },
  142. },
  143. },
  144. },
  145. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  146. {
  147. RangeStart: 0,
  148. RangeStop: 50,
  149. AssignedInstanceId: "consumer-instance-1",
  150. },
  151. {
  152. RangeStart: 50,
  153. RangeStop: 100,
  154. AssignedInstanceId: "consumer-instance-2",
  155. },
  156. },
  157. },
  158. {
  159. name: "2 consumer instances, 2 partitions, 1 new consumer instance",
  160. args: args{
  161. partitions: []*pub_balancer.PartitionSlotToBroker{
  162. {
  163. RangeStart: 0,
  164. RangeStop: 50,
  165. },
  166. {
  167. RangeStart: 50,
  168. RangeStop: 100,
  169. },
  170. },
  171. consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
  172. prevMapping: &PartitionSlotToConsumerInstanceList{
  173. PartitionSlots: []*PartitionSlotToConsumerInstance{
  174. {
  175. RangeStart: 0,
  176. RangeStop: 50,
  177. AssignedInstanceId: "consumer-instance-3",
  178. },
  179. {
  180. RangeStart: 50,
  181. RangeStop: 100,
  182. AssignedInstanceId: "consumer-instance-2",
  183. },
  184. },
  185. },
  186. },
  187. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  188. {
  189. RangeStart: 0,
  190. RangeStop: 50,
  191. AssignedInstanceId: "consumer-instance-3",
  192. },
  193. {
  194. RangeStart: 50,
  195. RangeStop: 100,
  196. AssignedInstanceId: "consumer-instance-2",
  197. },
  198. },
  199. },
  200. {
  201. name: "2 consumer instances, 2 partitions, 1 new partition",
  202. args: args{
  203. partitions: []*pub_balancer.PartitionSlotToBroker{
  204. {
  205. RangeStart: 0,
  206. RangeStop: 50,
  207. },
  208. {
  209. RangeStart: 50,
  210. RangeStop: 100,
  211. },
  212. {
  213. RangeStart: 100,
  214. RangeStop: 150,
  215. },
  216. },
  217. consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
  218. prevMapping: &PartitionSlotToConsumerInstanceList{
  219. PartitionSlots: []*PartitionSlotToConsumerInstance{
  220. {
  221. RangeStart: 0,
  222. RangeStop: 50,
  223. AssignedInstanceId: "consumer-instance-1",
  224. },
  225. {
  226. RangeStart: 50,
  227. RangeStop: 100,
  228. AssignedInstanceId: "consumer-instance-2",
  229. },
  230. },
  231. },
  232. },
  233. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  234. {
  235. RangeStart: 0,
  236. RangeStop: 50,
  237. AssignedInstanceId: "consumer-instance-1",
  238. },
  239. {
  240. RangeStart: 50,
  241. RangeStop: 100,
  242. AssignedInstanceId: "consumer-instance-2",
  243. },
  244. {
  245. RangeStart: 100,
  246. RangeStop: 150,
  247. AssignedInstanceId: "consumer-instance-1",
  248. },
  249. },
  250. },
  251. {
  252. name: "2 consumer instances, 2 partitions, 1 new partition, 1 new consumer instance",
  253. args: args{
  254. partitions: []*pub_balancer.PartitionSlotToBroker{
  255. {
  256. RangeStart: 0,
  257. RangeStop: 50,
  258. },
  259. {
  260. RangeStart: 50,
  261. RangeStop: 100,
  262. },
  263. {
  264. RangeStart: 100,
  265. RangeStop: 150,
  266. },
  267. },
  268. consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
  269. prevMapping: &PartitionSlotToConsumerInstanceList{
  270. PartitionSlots: []*PartitionSlotToConsumerInstance{
  271. {
  272. RangeStart: 0,
  273. RangeStop: 50,
  274. AssignedInstanceId: "consumer-instance-1",
  275. },
  276. {
  277. RangeStart: 50,
  278. RangeStop: 100,
  279. AssignedInstanceId: "consumer-instance-2",
  280. },
  281. },
  282. },
  283. },
  284. wantPartitionSlots: []*PartitionSlotToConsumerInstance{
  285. {
  286. RangeStart: 0,
  287. RangeStop: 50,
  288. AssignedInstanceId: "consumer-instance-1",
  289. },
  290. {
  291. RangeStart: 50,
  292. RangeStop: 100,
  293. AssignedInstanceId: "consumer-instance-2",
  294. },
  295. {
  296. RangeStart: 100,
  297. RangeStop: 150,
  298. AssignedInstanceId: "consumer-instance-3",
  299. },
  300. },
  301. },
  302. }
  303. for _, tt := range tests {
  304. t.Run(tt.name, func(t *testing.T) {
  305. if gotPartitionSlots := doBalanceSticky(tt.args.partitions, tt.args.consumerInstanceIds, tt.args.prevMapping); !reflect.DeepEqual(gotPartitionSlots, tt.wantPartitionSlots) {
  306. t.Errorf("doBalanceSticky() = %v, want %v", gotPartitionSlots, tt.wantPartitionSlots)
  307. }
  308. })
  309. }
  310. }