subscriber_kv.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials/insecure"
  11. "strings"
  12. "time"
  13. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  14. )
  15. var (
  16. namespace = flag.String("ns", "test", "namespace")
  17. t = flag.String("topic", "test", "topic")
  18. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  19. clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
  20. )
  21. func main() {
  22. flag.Parse()
  23. util_http.InitGlobalHttpClient()
  24. subscriberConfig := &sub_client.SubscriberConfiguration{
  25. ClientId: fmt.Sprintf("client-%d", *clientId),
  26. ConsumerGroup: "test",
  27. ConsumerGroupInstanceId: fmt.Sprintf("client-%d", *clientId),
  28. GrpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  29. }
  30. contentConfig := &sub_client.ContentConfiguration{
  31. Topic: topic.NewTopic(*namespace, *t),
  32. Filter: "",
  33. StartTime: time.Unix(1, 1),
  34. }
  35. processorConfig := sub_client.ProcessorConfiguration{
  36. ConcurrentPartitionLimit: 3,
  37. }
  38. brokers := strings.Split(*seedBrokers, ",")
  39. subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig, processorConfig)
  40. counter := 0
  41. subscriber.SetEachMessageFunc(func(key, value []byte) (bool, error) {
  42. counter++
  43. println(string(key), "=>", string(value), counter)
  44. return true, nil
  45. })
  46. subscriber.SetCompletionFunc(func() {
  47. glog.V(0).Infof("done received %d messages", counter)
  48. })
  49. if err := subscriber.Subscribe(); err != nil {
  50. fmt.Println(err)
  51. }
  52. }