lookup.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package balancer
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. var (
  7. ErrNoBroker = errors.New("no broker")
  8. )
  9. func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  10. // find existing topic partition assignments
  11. for brokerStatsItem := range b.Brokers.IterBuffered() {
  12. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  13. for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
  14. topicPartitionStat := topicPartitionStatsItem.Val
  15. if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
  16. topicPartitionStat.TopicPartition.Topic == topic.Name {
  17. assignment := &mq_pb.BrokerPartitionAssignment{
  18. Partition: &mq_pb.Partition{
  19. RingSize: MaxPartitionCount,
  20. RangeStart: topicPartitionStat.RangeStart,
  21. RangeStop: topicPartitionStat.RangeStop,
  22. },
  23. }
  24. if topicPartitionStat.IsLeader {
  25. assignment.LeaderBroker = broker
  26. } else {
  27. assignment.FollowerBrokers = append(assignment.FollowerBrokers, broker)
  28. }
  29. assignments = append(assignments, assignment)
  30. }
  31. }
  32. }
  33. if len(assignments) > 0 {
  34. return assignments, nil
  35. }
  36. // find the topic partitions on the filer
  37. // if the topic is not found
  38. // if the request is_for_publish
  39. // create the topic
  40. // if the request is_for_subscribe
  41. // return error not found
  42. // t := topic.FromPbTopic(request.Topic)
  43. if b.Brokers.IsEmpty() {
  44. return nil, ErrNoBroker
  45. }
  46. return allocateTopicPartitions(b.Brokers, partitionCount), nil
  47. }