publisher.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  6. "log"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. messageCount = flag.Int("n", 1000, "message count")
  13. concurrency = flag.Int("c", 4, "concurrency count")
  14. namespace = flag.String("ns", "test", "namespace")
  15. topic = flag.String("topic", "test", "topic")
  16. seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
  17. )
  18. func doPublish(publisher *pub_client.TopicPublisher, id int) {
  19. startTime := time.Now()
  20. for i := 0; i < *messageCount / *concurrency; i++ {
  21. // Simulate publishing a message
  22. key := []byte(fmt.Sprintf("key-%d-%d", id, i))
  23. value := []byte(fmt.Sprintf("value-%d-%d", id, i))
  24. publisher.Publish(key, value) // Call your publisher function here
  25. // println("Published", string(key), string(value))
  26. }
  27. elapsed := time.Since(startTime)
  28. log.Printf("Publisher %d finished in %s", id, elapsed)
  29. }
  30. func main() {
  31. flag.Parse()
  32. config := &pub_client.PublisherConfiguration{
  33. CreateTopic: true,
  34. }
  35. publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
  36. brokers := strings.Split(*seedBrokers, ",")
  37. if err := publisher.Connect(brokers); err != nil {
  38. fmt.Println(err)
  39. return
  40. }
  41. startTime := time.Now()
  42. // Start multiple publishers
  43. var wg sync.WaitGroup
  44. for i := 0; i < *concurrency; i++ {
  45. wg.Add(1)
  46. go func(id int) {
  47. defer wg.Done()
  48. doPublish(publisher, id)
  49. }(i)
  50. }
  51. // Wait for all publishers to finish
  52. wg.Wait()
  53. elapsed := time.Since(startTime)
  54. publisher.Shutdown()
  55. log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
  56. }