broker_grpc_sub.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "time"
  9. )
  10. func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
  11. localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.Cursor.Topic),
  12. topic.FromPbPartition(req.Cursor.Partition))
  13. if localTopicPartition == nil {
  14. stream.Send(&mq_pb.SubscribeResponse{
  15. Message: &mq_pb.SubscribeResponse_Ctrl{
  16. Ctrl: &mq_pb.SubscribeResponse_CtrlMessage{
  17. Error: "not initialized",
  18. },
  19. },
  20. })
  21. return nil
  22. }
  23. clientName := fmt.Sprintf("%s/%s-%s", req.Consumer.ConsumerGroup, req.Consumer.ConsumerId, req.Consumer.ClientId)
  24. localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
  25. value := logEntry.GetData()
  26. if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
  27. Data: &mq_pb.DataMessage{
  28. Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
  29. Value: value,
  30. },
  31. }}); err != nil {
  32. glog.Errorf("Error sending setup response: %v", err)
  33. return err
  34. }
  35. return nil
  36. })
  37. return nil
  38. }