local_partition.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package topic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/codes"
  11. "google.golang.org/grpc/status"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. type LocalPartition struct {
  17. ListenersWaits int64
  18. AckTsNs int64
  19. // notifying clients
  20. ListenersLock sync.Mutex
  21. ListenersCond *sync.Cond
  22. Partition
  23. LogBuffer *log_buffer.LogBuffer
  24. Publishers *LocalPartitionPublishers
  25. Subscribers *LocalPartitionSubscribers
  26. publishFolloweMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
  27. followerGrpcConnection *grpc.ClientConn
  28. Follower string
  29. }
  30. var TIME_FORMAT = "2006-01-02-15-04-05"
  31. func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
  32. lp := &LocalPartition{
  33. Partition: partition,
  34. Publishers: NewLocalPartitionPublishers(),
  35. Subscribers: NewLocalPartitionSubscribers(),
  36. }
  37. lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
  38. lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
  39. 2*time.Minute, logFlushFn, readFromDiskFn, func() {
  40. if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
  41. lp.ListenersCond.Broadcast()
  42. }
  43. })
  44. return lp
  45. }
  46. func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
  47. p.LogBuffer.AddToBuffer(message)
  48. // maybe send to the follower
  49. if p.publishFolloweMeStream != nil {
  50. // println("recv", string(message.Key), message.TsNs)
  51. if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  52. Message: &mq_pb.PublishFollowMeRequest_Data{
  53. Data: message,
  54. },
  55. }); followErr != nil {
  56. return fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
  57. }
  58. } else {
  59. atomic.StoreInt64(&p.AckTsNs, message.TsNs)
  60. }
  61. return nil
  62. }
  63. func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
  64. onNoMessageFn func() bool, eachMessageFn log_buffer.EachLogEntryFuncType) error {
  65. var processedPosition log_buffer.MessagePosition
  66. var readPersistedLogErr error
  67. var readInMemoryLogErr error
  68. var isDone bool
  69. for {
  70. processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
  71. if readPersistedLogErr != nil {
  72. glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
  73. return readPersistedLogErr
  74. }
  75. if isDone {
  76. return nil
  77. }
  78. startPosition = processedPosition
  79. processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
  80. if isDone {
  81. return nil
  82. }
  83. startPosition = processedPosition
  84. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  85. continue
  86. }
  87. if readInMemoryLogErr != nil {
  88. glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
  89. return readInMemoryLogErr
  90. }
  91. }
  92. }
  93. func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time {
  94. return p.LogBuffer.GetEarliestTime()
  95. }
  96. func (p *LocalPartition) HasData() bool {
  97. return !p.LogBuffer.GetEarliestTime().IsZero()
  98. }
  99. func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition {
  100. return p.LogBuffer.GetEarliestPosition()
  101. }
  102. func (p *LocalPartition) closePublishers() {
  103. p.Publishers.SignalShutdown()
  104. }
  105. func (p *LocalPartition) closeSubscribers() {
  106. p.Subscribers.SignalShutdown()
  107. }
  108. func (p *LocalPartition) WaitUntilNoPublishers() {
  109. for {
  110. if p.Publishers.Size() == 0 {
  111. return
  112. }
  113. time.Sleep(113 * time.Millisecond)
  114. }
  115. }
  116. func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
  117. if p.publishFolloweMeStream != nil {
  118. return nil
  119. }
  120. if initMessage.FollowerBroker == "" {
  121. return nil
  122. }
  123. p.Follower = initMessage.FollowerBroker
  124. ctx := context.Background()
  125. p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.Follower, true, grpcDialOption)
  126. if err != nil {
  127. return fmt.Errorf("fail to dial %s: %v", p.Follower, err)
  128. }
  129. followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
  130. p.publishFolloweMeStream, err = followerClient.PublishFollowMe(ctx)
  131. if err != nil {
  132. return fmt.Errorf("fail to create publish client: %v", err)
  133. }
  134. if err = p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  135. Message: &mq_pb.PublishFollowMeRequest_Init{
  136. Init: &mq_pb.PublishFollowMeRequest_InitMessage{
  137. Topic: initMessage.Topic,
  138. Partition: initMessage.Partition,
  139. },
  140. },
  141. }); err != nil {
  142. return err
  143. }
  144. // start receiving ack from follower
  145. go func() {
  146. defer func() {
  147. // println("stop receiving ack from follower")
  148. }()
  149. for {
  150. ack, err := p.publishFolloweMeStream.Recv()
  151. if err != nil {
  152. e, _ := status.FromError(err)
  153. if e.Code() == codes.Canceled {
  154. glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
  155. return
  156. }
  157. glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
  158. return
  159. }
  160. atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
  161. // println("recv ack", ack.AckTsNs)
  162. }
  163. }()
  164. return nil
  165. }
  166. func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
  167. if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 {
  168. p.LogBuffer.ShutdownLogBuffer()
  169. for !p.LogBuffer.IsAllFlushed() {
  170. time.Sleep(113 * time.Millisecond)
  171. }
  172. if p.publishFolloweMeStream != nil {
  173. // send close to the follower
  174. if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  175. Message: &mq_pb.PublishFollowMeRequest_Close{
  176. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  177. },
  178. }); followErr != nil {
  179. glog.Errorf("Error closing follower stream: %v", followErr)
  180. }
  181. glog.V(4).Infof("closing grpcConnection to follower")
  182. p.followerGrpcConnection.Close()
  183. p.publishFolloweMeStream = nil
  184. p.Follower = ""
  185. }
  186. hasShutdown = true
  187. }
  188. glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
  189. return
  190. }
  191. func (p *LocalPartition) Shutdown() {
  192. p.closePublishers()
  193. p.closeSubscribers()
  194. p.LogBuffer.ShutdownLogBuffer()
  195. glog.V(0).Infof("local partition %v shutting down", p.Partition)
  196. }
  197. func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
  198. if p.publishFolloweMeStream != nil {
  199. if followErr := p.publishFolloweMeStream.Send(&mq_pb.PublishFollowMeRequest{
  200. Message: &mq_pb.PublishFollowMeRequest_Flush{
  201. Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
  202. TsNs: flushTsNs,
  203. },
  204. },
  205. }); followErr != nil {
  206. glog.Errorf("send follower %s flush message: %v", p.Follower, followErr)
  207. }
  208. // println("notifying", p.Follower, "flushed at", flushTsNs)
  209. }
  210. }