balance.go 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package pub_balancer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  4. "google.golang.org/grpc"
  5. )
  6. /*
  7. * Assuming a topic has [x,y] number of partitions when publishing, and there are b number of brokers.
  8. * and p is the number of partitions per topic.
  9. * if the broker number b <= x, then p = x.
  10. * if the broker number x < b < y, then x <= p <= b.
  11. * if the broker number b >= y, x <= p <= y
  12. Balance topic partitions to brokers
  13. ===================================
  14. When the goal is to make sure that low traffic partitions can be merged, (and p >= x, and after last rebalance interval):
  15. 1. Calculate the average load(throughput) of partitions per topic.
  16. 2. If any two neighboring partitions have a load that is less than the average load, merge them.
  17. 3. If min(b, y) < p, then merge two neighboring partitions that have the least combined load.
  18. When the goal is to make sure that high traffic partitions can be split, (and p < y and p < b, and after last rebalance interval):
  19. 1. Calculate the average number of partitions per broker.
  20. 2. If any partition has a load that is more than the average load, split it into two partitions.
  21. When the goal is to make sure that each broker has the same number of partitions:
  22. 1. Calculate the average number of partitions per broker.
  23. 2. For the brokers that have more than the average number of partitions, move the partitions to the brokers that have less than the average number of partitions.
  24. */
  25. type BalanceAction interface {
  26. }
  27. type BalanceActionMerge struct {
  28. Before []topic.TopicPartition
  29. After topic.TopicPartition
  30. }
  31. type BalanceActionSplit struct {
  32. Before topic.TopicPartition
  33. After []topic.TopicPartition
  34. }
  35. type BalanceActionMove struct {
  36. TopicPartition topic.TopicPartition
  37. SourceBroker string
  38. TargetBroker string
  39. }
  40. type BalanceActionCreate struct {
  41. TopicPartition topic.TopicPartition
  42. TargetBroker string
  43. }
  44. // BalancePublishers check the stats of all brokers,
  45. // and balance the publishers to the brokers.
  46. func (balancer *PubBalancer) BalancePublishers() []BalanceAction {
  47. action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
  48. return []BalanceAction{action}
  49. }
  50. func (balancer *PubBalancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
  51. for _, action := range actions {
  52. switch action.(type) {
  53. case *BalanceActionMove:
  54. err = balancer.ExecuteBalanceActionMove(action.(*BalanceActionMove), grpcDialOption)
  55. }
  56. if err != nil {
  57. return err
  58. }
  59. }
  60. return nil
  61. }