local_partition.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package topic
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  8. "time"
  9. )
  10. type LocalPartition struct {
  11. Partition
  12. isLeader bool
  13. FollowerBrokers []pb.ServerAddress
  14. logBuffer *log_buffer.LogBuffer
  15. ConsumerCount int32
  16. }
  17. func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
  18. return &LocalPartition{
  19. Partition: partition,
  20. isLeader: isLeader,
  21. FollowerBrokers: followerBrokers,
  22. logBuffer: log_buffer.NewLogBuffer(
  23. fmt.Sprintf("%s/%s/%4d-%4d", topic.Namespace, topic.Name, partition.RangeStart, partition.RangeStop),
  24. 2*time.Minute,
  25. func(startTime, stopTime time.Time, buf []byte) {
  26. },
  27. func() {
  28. },
  29. ),
  30. }
  31. }
  32. type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error
  33. func (p LocalPartition) Publish(message *mq_pb.DataMessage) {
  34. p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
  35. }
  36. func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) {
  37. p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool {
  38. return true
  39. }, eachMessageFn)
  40. }
  41. func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition {
  42. isLeaer := assignment.LeaderBroker == string(self)
  43. localPartition := &LocalPartition{
  44. Partition: FromPbPartition(assignment.Partition),
  45. isLeader: isLeaer,
  46. }
  47. if !isLeaer {
  48. return localPartition
  49. }
  50. followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
  51. for i, follower := range assignment.FollowerBrokers {
  52. followers[i] = pb.ServerAddress(follower)
  53. }
  54. localPartition.FollowerBrokers = followers
  55. return localPartition
  56. }