lookup.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package pub_balancer
  2. import (
  3. "errors"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. var (
  8. ErrNoBroker = errors.New("no broker")
  9. )
  10. func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
  11. if partitionCount == 0 {
  12. partitionCount = 6
  13. }
  14. // find existing topic partition assignments
  15. for brokerStatsItem := range balancer.Brokers.IterBuffered() {
  16. broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  17. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  18. topicPartitionStat := topicPartitionStatsItem.Val
  19. if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
  20. topicPartitionStat.TopicPartition.Name == topic.Name {
  21. assignment := &mq_pb.BrokerPartitionAssignment{
  22. Partition: &mq_pb.Partition{
  23. RingSize: MaxPartitionCount,
  24. RangeStart: topicPartitionStat.RangeStart,
  25. RangeStop: topicPartitionStat.RangeStop,
  26. },
  27. }
  28. // TODO fix follower setting
  29. assignment.LeaderBroker = broker
  30. assignments = append(assignments, assignment)
  31. }
  32. }
  33. }
  34. if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish {
  35. glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
  36. return assignments, nil
  37. }
  38. // find the topic partitions on the filer
  39. // if the topic is not found
  40. // if the request is_for_publish
  41. // create the topic
  42. // if the request is_for_subscribe
  43. // return error not found
  44. // t := topic.FromPbTopic(request.Topic)
  45. if balancer.Brokers.IsEmpty() {
  46. return nil, ErrNoBroker
  47. }
  48. return allocateTopicPartitions(balancer.Brokers, partitionCount), nil
  49. }