partition_list_broker.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. package pub_balancer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. type PartitionSlotToBroker struct {
  7. RangeStart int32
  8. RangeStop int32
  9. AssignedBroker string
  10. }
  11. type PartitionSlotToBrokerList struct {
  12. PartitionSlots []*PartitionSlotToBroker
  13. RingSize int32
  14. }
  15. func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
  16. return &PartitionSlotToBrokerList{
  17. RingSize: ringSize,
  18. }
  19. }
  20. func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string) {
  21. for _, partitionSlot := range ps.PartitionSlots {
  22. if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
  23. if partitionSlot.AssignedBroker == broker {
  24. return
  25. }
  26. if partitionSlot.AssignedBroker != "" {
  27. glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
  28. }
  29. partitionSlot.AssignedBroker = broker
  30. return
  31. }
  32. }
  33. ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
  34. RangeStart: partition.RangeStart,
  35. RangeStop: partition.RangeStop,
  36. AssignedBroker: broker,
  37. })
  38. }
  39. func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
  40. for _, partitionSlot := range ps.PartitionSlots {
  41. if partitionSlot.AssignedBroker == broker {
  42. partitionSlot.AssignedBroker = ""
  43. }
  44. }
  45. }