lookup.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package balancer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  4. )
  5. func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  6. // find existing topic partition assignments
  7. for brokerStatsItem := range b.Brokers.IterBuffered() {
  8. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  9. for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
  10. topicPartitionStat := topicPartitionStatsItem.Val
  11. if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
  12. topicPartitionStat.TopicPartition.Topic == topic.Name {
  13. assignment := &mq_pb.BrokerPartitionAssignment{
  14. Partition: &mq_pb.Partition{
  15. RingSize: MaxPartitionCount,
  16. RangeStart: topicPartitionStat.RangeStart,
  17. RangeStop: topicPartitionStat.RangeStop,
  18. },
  19. }
  20. if topicPartitionStat.IsLeader {
  21. assignment.LeaderBroker = broker
  22. } else {
  23. assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker)
  24. }
  25. assignments = append(assignments, assignment)
  26. }
  27. }
  28. }
  29. if len(assignments) > 0 {
  30. return assignments, nil
  31. }
  32. // find the topic partitions on the filer
  33. // if the topic is not found
  34. // if the request is_for_publish
  35. // create the topic
  36. // if the request is_for_subscribe
  37. // return error not found
  38. // t := topic.FromPbTopic(request.Topic)
  39. return allocateTopicPartitions(b.Brokers, 6), nil
  40. }