partition_list_broker.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package pub_balancer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. type PartitionSlotToBroker struct {
  7. RangeStart int32
  8. RangeStop int32
  9. UnixTimeNs int64
  10. AssignedBroker string
  11. }
  12. type PartitionSlotToBrokerList struct {
  13. PartitionSlots []*PartitionSlotToBroker
  14. RingSize int32
  15. }
  16. func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
  17. return &PartitionSlotToBrokerList{
  18. RingSize: ringSize,
  19. }
  20. }
  21. func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string) {
  22. for _, partitionSlot := range ps.PartitionSlots {
  23. if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
  24. if partitionSlot.AssignedBroker == broker {
  25. return
  26. }
  27. if partitionSlot.AssignedBroker != "" {
  28. glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
  29. }
  30. partitionSlot.AssignedBroker = broker
  31. return
  32. }
  33. }
  34. ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
  35. RangeStart: partition.RangeStart,
  36. RangeStop: partition.RangeStop,
  37. UnixTimeNs: partition.UnixTimeNs,
  38. AssignedBroker: broker,
  39. })
  40. }
  41. func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
  42. ps.ReplaceBroker(broker, "")
  43. }
  44. func (ps *PartitionSlotToBrokerList) ReplaceBroker(oldBroker string, newBroker string) {
  45. for _, partitionSlot := range ps.PartitionSlots {
  46. if partitionSlot.AssignedBroker == oldBroker {
  47. partitionSlot.AssignedBroker = newBroker
  48. }
  49. }
  50. }