publisher_kv.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "log"
  8. "strings"
  9. "sync"
  10. "time"
  11. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  12. )
  13. var (
  14. messageCount = flag.Int("n", 1000, "message count")
  15. concurrency = flag.Int("c", 4, "concurrent publishers")
  16. partitionCount = flag.Int("p", 6, "partition count")
  17. clientName = flag.String("client", "c1", "client name")
  18. namespace = flag.String("ns", "test", "namespace")
  19. t = flag.String("t", "test", "t")
  20. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  21. )
  22. func doPublish(publisher *pub_client.TopicPublisher, id int) {
  23. startTime := time.Now()
  24. for i := 0; i < *messageCount / *concurrency; i++ {
  25. // Simulate publishing a message
  26. key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i))
  27. value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i))
  28. if err := publisher.Publish(key, value); err != nil {
  29. fmt.Println(err)
  30. break
  31. }
  32. time.Sleep(time.Second)
  33. // println("Published", string(key), string(value))
  34. }
  35. if err := publisher.FinishPublish(); err != nil {
  36. fmt.Println(err)
  37. }
  38. elapsed := time.Since(startTime)
  39. log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
  40. }
  41. func main() {
  42. flag.Parse()
  43. util_http.InitGlobalHttpClient()
  44. config := &pub_client.PublisherConfiguration{
  45. Topic: topic.NewTopic(*namespace, *t),
  46. PartitionCount: int32(*partitionCount),
  47. Brokers: strings.Split(*seedBrokers, ","),
  48. PublisherName: *clientName,
  49. }
  50. publisher := pub_client.NewTopicPublisher(config)
  51. startTime := time.Now()
  52. var wg sync.WaitGroup
  53. // Start multiple publishers
  54. for i := 0; i < *concurrency; i++ {
  55. wg.Add(1)
  56. go func(id int) {
  57. defer wg.Done()
  58. doPublish(publisher, id)
  59. }(i)
  60. }
  61. // Wait for all publishers to finish
  62. wg.Wait()
  63. elapsed := time.Since(startTime)
  64. publisher.Shutdown()
  65. log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
  66. }