partition_list_broker.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package pub_balancer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. type PartitionSlotToBroker struct {
  7. RangeStart int32
  8. RangeStop int32
  9. UnixTimeNs int64
  10. AssignedBroker string
  11. FollowerBroker string
  12. }
  13. type PartitionSlotToBrokerList struct {
  14. PartitionSlots []*PartitionSlotToBroker
  15. RingSize int32
  16. }
  17. func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
  18. return &PartitionSlotToBrokerList{
  19. RingSize: ringSize,
  20. }
  21. }
  22. func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string, follower string) {
  23. for _, partitionSlot := range ps.PartitionSlots {
  24. if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
  25. if partitionSlot.AssignedBroker != "" && partitionSlot.AssignedBroker != broker {
  26. glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
  27. partitionSlot.AssignedBroker = broker
  28. }
  29. if partitionSlot.FollowerBroker != "" && partitionSlot.FollowerBroker != follower {
  30. glog.V(0).Infof("partition %s follower change: %s => %s", partition, partitionSlot.FollowerBroker, follower)
  31. partitionSlot.FollowerBroker = follower
  32. }
  33. return
  34. }
  35. }
  36. ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
  37. RangeStart: partition.RangeStart,
  38. RangeStop: partition.RangeStop,
  39. UnixTimeNs: partition.UnixTimeNs,
  40. AssignedBroker: broker,
  41. FollowerBroker: follower,
  42. })
  43. }
  44. func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
  45. ps.ReplaceBroker(broker, "")
  46. }
  47. func (ps *PartitionSlotToBrokerList) ReplaceBroker(oldBroker string, newBroker string) {
  48. for _, partitionSlot := range ps.PartitionSlots {
  49. if partitionSlot.AssignedBroker == oldBroker {
  50. partitionSlot.AssignedBroker = newBroker
  51. }
  52. }
  53. }