chan_sub.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package msgclient
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "hash"
  6. "io"
  7. "log"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  10. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  11. )
  12. type SubChannel struct {
  13. ch chan []byte
  14. stream messaging_pb.SeaweedMessaging_SubscribeClient
  15. md5hash hash.Hash
  16. cancel context.CancelFunc
  17. }
  18. func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
  19. tp := broker.TopicPartition{
  20. Namespace: "chan",
  21. Topic: chanName,
  22. Partition: 0,
  23. }
  24. grpcConnection, err := mc.findBroker(tp)
  25. if err != nil {
  26. return nil, err
  27. }
  28. ctx, cancel := context.WithCancel(context.Background())
  29. sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
  30. if err != nil {
  31. return nil, err
  32. }
  33. t := &SubChannel{
  34. ch: make(chan []byte),
  35. stream: sc,
  36. md5hash: md5.New(),
  37. cancel: cancel,
  38. }
  39. go func() {
  40. for {
  41. resp, subErr := t.stream.Recv()
  42. if subErr == io.EOF {
  43. return
  44. }
  45. if subErr != nil {
  46. log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
  47. return
  48. }
  49. if resp.Data == nil {
  50. // this could be heartbeat from broker
  51. continue
  52. }
  53. if resp.Data.IsClose {
  54. t.stream.Send(&messaging_pb.SubscriberMessage{
  55. IsClose: true,
  56. })
  57. close(t.ch)
  58. cancel()
  59. return
  60. }
  61. t.ch <- resp.Data.Value
  62. t.md5hash.Write(resp.Data.Value)
  63. }
  64. }()
  65. return t, nil
  66. }
  67. func (sc *SubChannel) Channel() chan []byte {
  68. return sc.ch
  69. }
  70. func (sc *SubChannel) Md5() []byte {
  71. return sc.md5hash.Sum(nil)
  72. }
  73. func (sc *SubChannel) Cancel() {
  74. sc.cancel()
  75. }