local_partition.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package topic
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/status"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. type LocalPartition struct {
  18. ListenersWaits int64
  19. AckTsNs int64
  20. // notifying clients
  21. ListenersLock sync.Mutex
  22. ListenersCond *sync.Cond
  23. Partition
  24. LogBuffer *log_buffer.LogBuffer
  25. Publishers *LocalPartitionPublishers
  26. Subscribers *LocalPartitionSubscribers
  27. RecordType *schema_pb.RecordType
  28. publishFollowMeStream mq_pb.SeaweedMessaging_PublishFollowMeClient
  29. followerGrpcConnection *grpc.ClientConn
  30. Follower string
  31. }
  32. var TIME_FORMAT = "2006-01-02-15-04-05"
  33. func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType, recordType *schema_pb.RecordType) *LocalPartition {
  34. lp := &LocalPartition{
  35. Partition: partition,
  36. Publishers: NewLocalPartitionPublishers(),
  37. Subscribers: NewLocalPartitionSubscribers(),
  38. RecordType: recordType,
  39. }
  40. lp.ListenersCond = sync.NewCond(&lp.ListenersLock)
  41. lp.LogBuffer = log_buffer.NewLogBuffer(fmt.Sprintf("%d/%04d-%04d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
  42. 2*time.Minute, logFlushFn, readFromDiskFn, func() {
  43. if atomic.LoadInt64(&lp.ListenersWaits) > 0 {
  44. lp.ListenersCond.Broadcast()
  45. }
  46. })
  47. return lp
  48. }
  49. func (p *LocalPartition) Publish(message *mq_pb.DataMessage) error {
  50. p.LogBuffer.AddToBuffer(message)
  51. // maybe send to the follower
  52. if p.publishFollowMeStream != nil {
  53. // println("recv", string(message.Key), message.TsNs)
  54. if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
  55. Message: &mq_pb.PublishFollowMeRequest_Data{
  56. Data: message,
  57. },
  58. }); followErr != nil {
  59. return fmt.Errorf("send to follower %s: %v", p.Follower, followErr)
  60. }
  61. } else {
  62. atomic.StoreInt64(&p.AckTsNs, message.TsNs)
  63. }
  64. return nil
  65. }
  66. func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.MessagePosition,
  67. onNoMessageFn func() bool, eachMessageFn log_buffer.EachLogEntryFuncType) error {
  68. var processedPosition log_buffer.MessagePosition
  69. var readPersistedLogErr error
  70. var readInMemoryLogErr error
  71. var isDone bool
  72. for {
  73. processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
  74. if readPersistedLogErr != nil {
  75. glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
  76. return readPersistedLogErr
  77. }
  78. if isDone {
  79. return nil
  80. }
  81. startPosition = processedPosition
  82. processedPosition, isDone, readInMemoryLogErr = p.LogBuffer.LoopProcessLogData(clientName, startPosition, 0, onNoMessageFn, eachMessageFn)
  83. if isDone {
  84. return nil
  85. }
  86. startPosition = processedPosition
  87. if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
  88. continue
  89. }
  90. if readInMemoryLogErr != nil {
  91. glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
  92. return readInMemoryLogErr
  93. }
  94. }
  95. }
  96. func (p *LocalPartition) GetEarliestMessageTimeInMemory() time.Time {
  97. return p.LogBuffer.GetEarliestTime()
  98. }
  99. func (p *LocalPartition) HasData() bool {
  100. return !p.LogBuffer.GetEarliestTime().IsZero()
  101. }
  102. func (p *LocalPartition) GetEarliestInMemoryMessagePosition() log_buffer.MessagePosition {
  103. return p.LogBuffer.GetEarliestPosition()
  104. }
  105. func (p *LocalPartition) closePublishers() {
  106. p.Publishers.SignalShutdown()
  107. }
  108. func (p *LocalPartition) closeSubscribers() {
  109. p.Subscribers.SignalShutdown()
  110. }
  111. func (p *LocalPartition) WaitUntilNoPublishers() {
  112. for {
  113. if p.Publishers.Size() == 0 {
  114. return
  115. }
  116. time.Sleep(113 * time.Millisecond)
  117. }
  118. }
  119. func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessageRequest_InitMessage, grpcDialOption grpc.DialOption) (err error) {
  120. if p.publishFollowMeStream != nil {
  121. return nil
  122. }
  123. if initMessage.FollowerBroker == "" {
  124. return nil
  125. }
  126. p.Follower = initMessage.FollowerBroker
  127. ctx := context.Background()
  128. p.followerGrpcConnection, err = pb.GrpcDial(ctx, p.Follower, true, grpcDialOption)
  129. if err != nil {
  130. return fmt.Errorf("fail to dial %s: %v", p.Follower, err)
  131. }
  132. followerClient := mq_pb.NewSeaweedMessagingClient(p.followerGrpcConnection)
  133. p.publishFollowMeStream, err = followerClient.PublishFollowMe(ctx)
  134. if err != nil {
  135. return fmt.Errorf("fail to create publish client: %v", err)
  136. }
  137. if err = p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
  138. Message: &mq_pb.PublishFollowMeRequest_Init{
  139. Init: &mq_pb.PublishFollowMeRequest_InitMessage{
  140. Topic: initMessage.Topic,
  141. Partition: initMessage.Partition,
  142. },
  143. },
  144. }); err != nil {
  145. return err
  146. }
  147. // start receiving ack from follower
  148. go func() {
  149. defer func() {
  150. // println("stop receiving ack from follower")
  151. }()
  152. for {
  153. ack, err := p.publishFollowMeStream.Recv()
  154. if err != nil {
  155. e, _ := status.FromError(err)
  156. if e.Code() == codes.Canceled {
  157. glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
  158. return
  159. }
  160. glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
  161. return
  162. }
  163. atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
  164. // println("recv ack", ack.AckTsNs)
  165. }
  166. }()
  167. return nil
  168. }
  169. func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
  170. if p.Publishers.Size() == 0 && p.Subscribers.Size() == 0 {
  171. p.LogBuffer.ShutdownLogBuffer()
  172. for !p.LogBuffer.IsAllFlushed() {
  173. time.Sleep(113 * time.Millisecond)
  174. }
  175. if p.publishFollowMeStream != nil {
  176. // send close to the follower
  177. if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
  178. Message: &mq_pb.PublishFollowMeRequest_Close{
  179. Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
  180. },
  181. }); followErr != nil {
  182. glog.Errorf("Error closing follower stream: %v", followErr)
  183. }
  184. glog.V(4).Infof("closing grpcConnection to follower")
  185. p.followerGrpcConnection.Close()
  186. p.publishFollowMeStream = nil
  187. p.Follower = ""
  188. }
  189. hasShutdown = true
  190. }
  191. glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
  192. return
  193. }
  194. func (p *LocalPartition) Shutdown() {
  195. p.closePublishers()
  196. p.closeSubscribers()
  197. p.LogBuffer.ShutdownLogBuffer()
  198. glog.V(0).Infof("local partition %v shutting down", p.Partition)
  199. }
  200. func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
  201. if p.publishFollowMeStream != nil {
  202. if followErr := p.publishFollowMeStream.Send(&mq_pb.PublishFollowMeRequest{
  203. Message: &mq_pb.PublishFollowMeRequest_Flush{
  204. Flush: &mq_pb.PublishFollowMeRequest_FlushMessage{
  205. TsNs: flushTsNs,
  206. },
  207. },
  208. }); followErr != nil {
  209. glog.Errorf("send follower %s flush message: %v", p.Follower, followErr)
  210. }
  211. // println("notifying", p.Follower, "flushed at", flushTsNs)
  212. }
  213. }