publisher.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
  6. "log"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. messageCount = flag.Int("n", 1000, "message count")
  12. concurrency = flag.Int("c", 4, "concurrency count")
  13. )
  14. func doPublish(publisher *pub_client.TopicPublisher, id int) {
  15. startTime := time.Now()
  16. for i := 0; i < *messageCount / *concurrency; i++ {
  17. // Simulate publishing a message
  18. key := []byte(fmt.Sprintf("key-%d-%d", id, i))
  19. value := []byte(fmt.Sprintf("value-%d-%d", id, i))
  20. publisher.Publish(key, value) // Call your publisher function here
  21. // println("Published", string(key), string(value))
  22. }
  23. elapsed := time.Since(startTime)
  24. log.Printf("Publisher %d finished in %s", id, elapsed)
  25. }
  26. func main() {
  27. flag.Parse()
  28. publisher := pub_client.NewTopicPublisher(
  29. "test", "test")
  30. if err := publisher.Connect("localhost:17777"); err != nil {
  31. fmt.Println(err)
  32. return
  33. }
  34. startTime := time.Now()
  35. // Start multiple publishers
  36. var wg sync.WaitGroup
  37. for i := 0; i < *concurrency; i++ {
  38. wg.Add(1)
  39. go func(id int) {
  40. defer wg.Done()
  41. doPublish(publisher, id)
  42. }(i)
  43. }
  44. // Wait for all publishers to finish
  45. wg.Wait()
  46. elapsed := time.Since(startTime)
  47. publisher.Shutdown()
  48. log.Printf("Published %d messages in %s (%.2f msg/s)", *messageCount, elapsed, float64(*messageCount)/elapsed.Seconds())
  49. }