lookup.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536
  1. package pub_balancer
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  6. )
  7. var (
  8. ErrNoBroker = errors.New("no broker")
  9. )
  10. func (balancer *PubBalancer) LookupTopicPartitions(topic *schema_pb.Topic) (assignments []*mq_pb.BrokerPartitionAssignment) {
  11. // find existing topic partition assignments
  12. for brokerStatsItem := range balancer.Brokers.IterBuffered() {
  13. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  14. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  15. topicPartitionStat := topicPartitionStatsItem.Val
  16. if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
  17. topicPartitionStat.TopicPartition.Name == topic.Name {
  18. assignment := &mq_pb.BrokerPartitionAssignment{
  19. Partition: &schema_pb.Partition{
  20. RingSize: MaxPartitionCount,
  21. RangeStart: topicPartitionStat.RangeStart,
  22. RangeStop: topicPartitionStat.RangeStop,
  23. UnixTimeNs: topicPartitionStat.UnixTimeNs,
  24. },
  25. }
  26. // TODO fix follower setting
  27. assignment.LeaderBroker = broker
  28. assignments = append(assignments, assignment)
  29. }
  30. }
  31. }
  32. return
  33. }