publisher.go 811 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. package client
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/mq/messages"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "time"
  6. )
  7. type PublishProcessor interface {
  8. AddMessage(m *messages.Message) error
  9. Shutdown() error
  10. }
  11. type PublisherOption struct {
  12. Masters string
  13. Topic string
  14. }
  15. type Publisher struct {
  16. option *PublisherOption
  17. masters []pb.ServerAddress
  18. processor *PublishStreamProcessor
  19. }
  20. func NewPublisher(option *PublisherOption) *Publisher {
  21. p := &Publisher{
  22. masters: pb.ServerAddresses(option.Masters).ToAddresses(),
  23. option: option,
  24. processor: NewPublishStreamProcessor(3, 887*time.Millisecond),
  25. }
  26. return p
  27. }
  28. func (p Publisher) Publish(m *messages.Message) error {
  29. return p.processor.AddMessage(m)
  30. }
  31. func (p Publisher) Shutdown() error {
  32. return p.processor.Shutdown()
  33. }