lookup.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435
  1. package pub_balancer
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. var (
  7. ErrNoBroker = errors.New("no broker")
  8. )
  9. func (balancer *PubBalancer) LookupTopicPartitions(topic *mq_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
  10. // find existing topic partition assignments
  11. for brokerStatsItem := range balancer.Brokers.IterBuffered() {
  12. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  13. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  14. topicPartitionStat := topicPartitionStatsItem.Val
  15. if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
  16. topicPartitionStat.TopicPartition.Name == topic.Name {
  17. assignment := &mq_pb.BrokerPartitionAssignment{
  18. Partition: &mq_pb.Partition{
  19. RingSize: MaxPartitionCount,
  20. RangeStart: topicPartitionStat.RangeStart,
  21. RangeStop: topicPartitionStat.RangeStop,
  22. UnixTimeNs: topicPartitionStat.UnixTimeNs,
  23. },
  24. }
  25. // TODO fix follower setting
  26. assignment.LeaderBroker = broker
  27. assignments = append(assignments, assignment)
  28. }
  29. }
  30. }
  31. return
  32. }