subscriber_kv.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/credentials/insecure"
  13. "strings"
  14. )
  15. var (
  16. namespace = flag.String("ns", "test", "namespace")
  17. t = flag.String("topic", "test", "topic")
  18. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  19. maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
  20. perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
  21. clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
  22. )
  23. func main() {
  24. flag.Parse()
  25. util_http.InitGlobalHttpClient()
  26. subscriberConfig := &sub_client.SubscriberConfiguration{
  27. ConsumerGroup: "test",
  28. ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
  29. GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  30. MaxPartitionCount: int32(*maxPartitionCount),
  31. SlidingWindowSize: int32(*perPartitionConcurrency),
  32. }
  33. contentConfig := &sub_client.ContentConfiguration{
  34. Topic: topic.NewTopic(*namespace, *t),
  35. Filter: "",
  36. }
  37. brokers := strings.Split(*seedBrokers, ",")
  38. subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, make(chan sub_client.KeyedOffset, 1024))
  39. counter := 0
  40. executors := util.NewLimitedConcurrentExecutor(int(subscriberConfig.SlidingWindowSize))
  41. subscriber.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
  42. executors.Execute(func() {
  43. counter++
  44. println(string(m.Data.Key), "=>", string(m.Data.Value), counter)
  45. })
  46. })
  47. subscriber.SetCompletionFunc(func() {
  48. glog.V(0).Infof("done received %d messages", counter)
  49. })
  50. if err := subscriber.Subscribe(); err != nil {
  51. fmt.Println(err)
  52. }
  53. }