subscriber.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  6. "google.golang.org/grpc"
  7. "google.golang.org/grpc/credentials/insecure"
  8. "strings"
  9. "time"
  10. )
  11. var (
  12. namespace = flag.String("ns", "test", "namespace")
  13. topic = flag.String("topic", "test", "topic")
  14. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  15. )
  16. func main() {
  17. flag.Parse()
  18. subscriberConfig := &sub_client.SubscriberConfiguration{
  19. ClientId: "testSubscriber",
  20. GroupId: "test",
  21. GroupInstanceId: "test",
  22. GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  23. }
  24. contentConfig := &sub_client.ContentConfiguration{
  25. Namespace: *namespace,
  26. Topic: *topic,
  27. Filter: "",
  28. StartTime: time.Now(),
  29. }
  30. brokers := strings.Split(*seedBrokers, ",")
  31. subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig)
  32. subscriber.SetEachMessageFunc(func(key, value []byte) bool {
  33. println(string(key), "=>", string(value))
  34. return true
  35. })
  36. subscriber.SetCompletionFunc(func() {
  37. println("done subscribing")
  38. })
  39. if err := subscriber.Subscribe(); err != nil {
  40. fmt.Println(err)
  41. }
  42. }