subscribe.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package sub_client
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "io"
  6. "log"
  7. "time"
  8. )
  9. // Subscribe subscribes to a topic's specified partitions.
  10. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
  11. func (sub *TopicSubscriber) Subscribe() error {
  12. index := -1
  13. util.RetryUntil("subscribe", func() error {
  14. index++
  15. index = index % len(sub.bootstrapBrokers)
  16. // ask balancer for brokers of the topic
  17. if err := sub.doLookup(sub.bootstrapBrokers[index]); err != nil {
  18. return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
  19. }
  20. if len(sub.brokerPartitionAssignments) == 0 {
  21. if sub.waitForMoreMessage {
  22. time.Sleep(1 * time.Second)
  23. return fmt.Errorf("no broker partition assignments")
  24. } else {
  25. return nil
  26. }
  27. }
  28. // treat the first broker as the topic leader
  29. // connect to the leader broker
  30. // subscribe to the topic
  31. if err := sub.doProcess(); err != nil {
  32. return fmt.Errorf("subscribe topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
  33. }
  34. return nil
  35. }, func(err error) bool {
  36. if err == io.EOF {
  37. log.Printf("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
  38. sub.waitForMoreMessage = false
  39. return false
  40. }
  41. return true
  42. })
  43. return nil
  44. }