subscriber_kv.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials/insecure"
  11. "strings"
  12. "time"
  13. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  14. )
  15. var (
  16. namespace = flag.String("ns", "test", "namespace")
  17. t = flag.String("topic", "test", "topic")
  18. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  19. maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
  20. perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
  21. clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
  22. )
  23. func main() {
  24. flag.Parse()
  25. util_http.InitGlobalHttpClient()
  26. subscriberConfig := &sub_client.SubscriberConfiguration{
  27. ConsumerGroup: "test",
  28. ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
  29. GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  30. MaxPartitionCount: int32(*maxPartitionCount),
  31. PerPartitionConcurrency: int32(*perPartitionConcurrency),
  32. }
  33. contentConfig := &sub_client.ContentConfiguration{
  34. Topic: topic.NewTopic(*namespace, *t),
  35. Filter: "",
  36. StartTime: time.Unix(1, 1),
  37. }
  38. brokers := strings.Split(*seedBrokers, ",")
  39. subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig)
  40. counter := 0
  41. subscriber.SetEachMessageFunc(func(key, value []byte) error {
  42. counter++
  43. println(string(key), "=>", string(value), counter)
  44. return nil
  45. })
  46. subscriber.SetCompletionFunc(func() {
  47. glog.V(0).Infof("done received %d messages", counter)
  48. })
  49. if err := subscriber.Subscribe(); err != nil {
  50. fmt.Println(err)
  51. }
  52. }