publisher_kv.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "log"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. var (
  13. messageCount = flag.Int("n", 1000, "message count")
  14. concurrency = flag.Int("c", 4, "concurrent publishers")
  15. partitionCount = flag.Int("p", 6, "partition count")
  16. clientName = flag.String("client", "c1", "client name")
  17. namespace = flag.String("ns", "test", "namespace")
  18. t = flag.String("t", "test", "t")
  19. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  20. )
  21. func doPublish(publisher *pub_client.TopicPublisher, id int) {
  22. startTime := time.Now()
  23. for i := 0; i < *messageCount / *concurrency; i++ {
  24. // Simulate publishing a message
  25. key := []byte(fmt.Sprintf("key-%s-%d-%d", *clientName, id, i))
  26. value := []byte(fmt.Sprintf("value-%s-%d-%d", *clientName, id, i))
  27. if err := publisher.Publish(key, value); err != nil {
  28. fmt.Println(err)
  29. break
  30. }
  31. time.Sleep(time.Second)
  32. // println("Published", string(key), string(value))
  33. }
  34. if err := publisher.FinishPublish(); err != nil {
  35. fmt.Println(err)
  36. }
  37. elapsed := time.Since(startTime)
  38. log.Printf("Publisher %s-%d finished in %s", *clientName, id, elapsed)
  39. }
  40. func main() {
  41. flag.Parse()
  42. config := &pub_client.PublisherConfiguration{
  43. Topic: topic.NewTopic(*namespace, *t),
  44. PartitionCount: int32(*partitionCount),
  45. Brokers: strings.Split(*seedBrokers, ","),
  46. PublisherName: *clientName,
  47. }
  48. publisher := pub_client.NewTopicPublisher(config)
  49. startTime := time.Now()
  50. var wg sync.WaitGroup
  51. // Start multiple publishers
  52. for i := 0; i < *concurrency; i++ {
  53. wg.Add(1)
  54. go func(id int) {
  55. defer wg.Done()
  56. doPublish(publisher, id)
  57. }(i)
  58. }
  59. // Wait for all publishers to finish
  60. wg.Wait()
  61. elapsed := time.Since(startTime)
  62. publisher.Shutdown()
  63. log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
  64. }