agent_grpc_subscribe.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package agent
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "google.golang.org/protobuf/proto"
  10. "time"
  11. )
  12. func (a *MessageQueueAgent) SubscribeRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error {
  13. // the first message is the subscribe request
  14. // it should only contain the session id
  15. m, err := stream.Recv()
  16. if err != nil {
  17. return err
  18. }
  19. a.subscribersLock.RLock()
  20. subscriberEntry, found := a.subscribers[SessionId(m.SessionId)]
  21. a.subscribersLock.RUnlock()
  22. if !found {
  23. return fmt.Errorf("subscribe session id %d not found", m.SessionId)
  24. }
  25. defer func() {
  26. subscriberEntry.lastActiveTsNs = time.Now().UnixNano()
  27. }()
  28. subscriberEntry.lastActiveTsNs = 0
  29. var lastErr error
  30. subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) {
  31. record := &schema_pb.RecordValue{}
  32. err := proto.Unmarshal(m.Data.Value, record)
  33. if err != nil {
  34. if lastErr == nil {
  35. lastErr = err
  36. }
  37. return
  38. }
  39. if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{
  40. Key: m.Data.Key,
  41. Value: record,
  42. TsNs: m.Data.TsNs,
  43. }); sendErr != nil {
  44. if lastErr == nil {
  45. lastErr = sendErr
  46. }
  47. }
  48. })
  49. go func() {
  50. subErr := subscriberEntry.entry.Subscribe()
  51. if subErr != nil {
  52. glog.V(0).Infof("subscriber %d subscribe: %v", m.SessionId, subErr)
  53. if lastErr == nil {
  54. lastErr = subErr
  55. }
  56. }
  57. }()
  58. for {
  59. m, err := stream.Recv()
  60. if err != nil {
  61. return err
  62. }
  63. if m != nil {
  64. subscriberEntry.entry.PartitionOffsetChan <- sub_client.KeyedOffset{
  65. Key: m.AckKey,
  66. Offset: m.AckSequence,
  67. }
  68. }
  69. }
  70. }