broker_grpc_server_publish.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package broker
  2. import (
  3. "crypto/md5"
  4. "fmt"
  5. "io"
  6. "github.com/golang/protobuf/proto"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  11. )
  12. func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
  13. // process initial request
  14. in, err := stream.Recv()
  15. if err == io.EOF {
  16. return nil
  17. }
  18. if err != nil {
  19. return err
  20. }
  21. // TODO look it up
  22. topicConfig := &messaging_pb.TopicConfiguration{
  23. // IsTransient: true,
  24. }
  25. // send init response
  26. initResponse := &messaging_pb.PublishResponse{
  27. Config: nil,
  28. Redirect: nil,
  29. }
  30. err = stream.Send(initResponse)
  31. if err != nil {
  32. return err
  33. }
  34. if initResponse.Redirect != nil {
  35. return nil
  36. }
  37. // get lock
  38. tp := TopicPartition{
  39. Namespace: in.Init.Namespace,
  40. Topic: in.Init.Topic,
  41. Partition: in.Init.Partition,
  42. }
  43. tpDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, tp.Namespace, tp.Topic)
  44. md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
  45. // println("chan data stored under", tpDir, "as", md5File)
  46. if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
  47. return fmt.Errorf("channel is already closed")
  48. }
  49. tl := broker.topicManager.RequestLock(tp, topicConfig, true)
  50. defer broker.topicManager.ReleaseLock(tp, true)
  51. md5hash := md5.New()
  52. // process each message
  53. for {
  54. // println("recv")
  55. in, err := stream.Recv()
  56. // glog.V(0).Infof("recieved %v err: %v", in, err)
  57. if err == io.EOF {
  58. return nil
  59. }
  60. if err != nil {
  61. return err
  62. }
  63. if in.Data == nil {
  64. continue
  65. }
  66. // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value))
  67. data, err := proto.Marshal(in.Data)
  68. if err != nil {
  69. glog.Errorf("marshall error: %v\n", err)
  70. continue
  71. }
  72. tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs)
  73. if in.Data.IsClose {
  74. // println("server received closing")
  75. break
  76. }
  77. md5hash.Write(in.Data.Value)
  78. }
  79. if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
  80. glog.V(0).Infof("err writing %s: %v", md5File, err)
  81. }
  82. // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
  83. // send the close ack
  84. // println("server send ack closing")
  85. if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
  86. glog.V(0).Infof("err sending close response: %v", err)
  87. }
  88. return nil
  89. }