123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- package broker
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "sync/atomic"
- "time"
- )
- // For a new or re-configured topic, or one of the broker went offline,
- // the pub clients ask one broker what are the brokers for all the topic partitions.
- // The broker will lock the topic on write.
- // 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
- // 2. if the topic is found, return the brokers for the topic partitions
- // For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
- // The broker will lock the topic on read.
- // 1. if the topic is not found, return error
- // 2. if the topic is found, return the brokers for the topic partitions
- //
- // If the topic needs to be re-balanced, the admin client will lock the topic,
- // 1. collect throughput information for all the brokers
- // 2. adjust the topic partitions to the brokers
- // 3. notify the brokers to add/remove partitions to host
- // 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
- // 4. the brokers will stop process incoming messages if not the right partition
- // 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
- // 4.2 the sub clients will need to change the brokers to read from
- //
- // The following is from each individual component's perspective:
- // For a pub client
- // For current topic/partition, ask one broker for the brokers for the topic partitions
- // 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
- // For a sub client
- // For current topic/partition, ask one broker for the brokers for the topic partitions
- // 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
- // For a broker
- // Upon a pub client lookup:
- // 1. lock the topic
- // 2. if already has topic partition assignment, check all brokers are healthy
- // 3. if not, create topic partition assignment
- // 2. return the brokers for the topic partitions
- // 3. unlock the topic
- // Upon a sub client lookup:
- // 1. lock the topic
- // 2. if already has topic partition assignment, check all brokers are healthy
- // 3. if not, return error
- // 2. return the brokers for the topic partitions
- // 3. unlock the topic
- // For an admin tool
- // 0. collect stats from all the brokers, and find the topic worth moving
- // 1. lock the topic
- // 2. collect throughput information for all the brokers
- // 3. adjust the topic partitions to the brokers
- // 4. notify the brokers to add/remove partitions to host
- // 5. the brokers will stop process incoming messages if not the right partition
- // 6. unlock the topic
- /*
- The messages are buffered in memory, and saved to filer under
- /topics/<topic>/<date>/<hour>/<segment>/*.msg
- /topics/<topic>/<date>/<hour>/segment
- /topics/<topic>/info/segment_<id>.meta
- */
- func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
- // 1. write to the volume server
- // 2. find the topic metadata owning filer
- // 3. write to the filer
- var localTopicPartition *topic.LocalPartition
- req, err := stream.Recv()
- if err != nil {
- return err
- }
- response := &mq_pb.PublishResponse{}
- // TODO check whether current broker should be the leader for the topic partition
- ackInterval := 1
- initMessage := req.GetInit()
- if initMessage != nil {
- t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
- if localTopicPartition == nil {
- localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
- broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
- }
- ackInterval = int(initMessage.AckInterval)
- stream.Send(response)
- } else {
- response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
- return stream.Send(response)
- }
- ackCounter := 0
- var ackSequence int64
- var isStopping int32
- respChan := make(chan *mq_pb.PublishResponse, 128)
- defer func() {
- atomic.StoreInt32(&isStopping, 1)
- close(respChan)
- }()
- go func() {
- ticker := time.NewTicker(1 * time.Second)
- for {
- select {
- case resp := <-respChan:
- if resp != nil {
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending response %v: %v", resp, err)
- }
- } else {
- return
- }
- case <-ticker.C:
- if atomic.LoadInt32(&isStopping) == 0 {
- response := &mq_pb.PublishResponse{
- AckSequence: ackSequence,
- }
- respChan <- response
- } else {
- return
- }
- }
- }
- }()
- // process each published messages
- for {
- // receive a message
- req, err := stream.Recv()
- if err != nil {
- return err
- }
- // Process the received message
- if dataMessage := req.GetData(); dataMessage != nil {
- localTopicPartition.Publish(dataMessage)
- }
- ackCounter++
- ackSequence++
- if ackCounter >= ackInterval {
- ackCounter = 0
- // send back the ack
- response := &mq_pb.PublishResponse{
- AckSequence: ackSequence,
- }
- respChan <- response
- }
- }
- glog.Infof("publish stream closed")
- return nil
- }
- // AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
- func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
- ret := &mq_pb.AssignTopicPartitionsResponse{}
- self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
- for _, brokerPartition := range request.BrokerPartitionAssignments {
- localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
- broker.localTopicManager.AddTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartiton)
- if request.IsLeader {
- for _, follower := range localPartiton.FollowerBrokers {
- err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.AssignTopicPartitions(context.Background(), request)
- return err
- })
- if err != nil {
- return ret, err
- }
- }
- }
- }
- return ret, nil
- }
|