123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- package pub_balancer
- import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- )
- type PartitionSlotToBroker struct {
- RangeStart int32
- RangeStop int32
- UnixTimeNs int64
- AssignedBroker string
- FollowerBroker string
- }
- type PartitionSlotToBrokerList struct {
- PartitionSlots []*PartitionSlotToBroker
- RingSize int32
- }
- func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
- return &PartitionSlotToBrokerList{
- RingSize: ringSize,
- }
- }
- func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string, follower string) {
- for _, partitionSlot := range ps.PartitionSlots {
- if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
- if partitionSlot.AssignedBroker != "" && partitionSlot.AssignedBroker != broker {
- glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
- partitionSlot.AssignedBroker = broker
- }
- if partitionSlot.FollowerBroker != "" && partitionSlot.FollowerBroker != follower {
- glog.V(0).Infof("partition %s follower change: %s => %s", partition, partitionSlot.FollowerBroker, follower)
- partitionSlot.FollowerBroker = follower
- }
- return
- }
- }
- ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- UnixTimeNs: partition.UnixTimeNs,
- AssignedBroker: broker,
- FollowerBroker: follower,
- })
- }
- func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
- ps.ReplaceBroker(broker, "")
- }
- func (ps *PartitionSlotToBrokerList) ReplaceBroker(oldBroker string, newBroker string) {
- for _, partitionSlot := range ps.PartitionSlots {
- if partitionSlot.AssignedBroker == oldBroker {
- partitionSlot.AssignedBroker = newBroker
- }
- }
- }
|