lookup.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package balancer
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. var (
  7. ErrNoBroker = errors.New("no broker")
  8. )
  9. func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  10. // find existing topic partition assignments
  11. for brokerStatsItem := range b.Brokers.IterBuffered() {
  12. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  13. for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
  14. topicPartitionStat := topicPartitionStatsItem.Val
  15. if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
  16. topicPartitionStat.TopicPartition.Topic == topic.Name {
  17. assignment := &mq_pb.BrokerPartitionAssignment{
  18. Partition: &mq_pb.Partition{
  19. RingSize: MaxPartitionCount,
  20. RangeStart: topicPartitionStat.RangeStart,
  21. RangeStop: topicPartitionStat.RangeStop,
  22. },
  23. }
  24. // TODO fix follower setting
  25. assignment.LeaderBroker = broker
  26. assignments = append(assignments, assignment)
  27. }
  28. }
  29. }
  30. if len(assignments) > 0 {
  31. return assignments, nil
  32. }
  33. // find the topic partitions on the filer
  34. // if the topic is not found
  35. // if the request is_for_publish
  36. // create the topic
  37. // if the request is_for_subscribe
  38. // return error not found
  39. // t := topic.FromPbTopic(request.Topic)
  40. if b.Brokers.IsEmpty() {
  41. return nil, ErrNoBroker
  42. }
  43. return allocateTopicPartitions(b.Brokers, partitionCount), nil
  44. }