broker_grpc_admin.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/cluster"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. "sort"
  11. "sync"
  12. )
  13. func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
  14. ret := &mq_pb.FindBrokerLeaderResponse{}
  15. err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  16. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  17. ClientType: cluster.BrokerType,
  18. FilerGroup: request.FilerGroup,
  19. })
  20. if err != nil {
  21. return err
  22. }
  23. if len(resp.ClusterNodes) == 0 {
  24. return nil
  25. }
  26. ret.Broker = resp.ClusterNodes[0].Address
  27. return nil
  28. })
  29. return ret, err
  30. }
  31. func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
  32. ret := &mq_pb.AssignSegmentBrokersResponse{}
  33. segment := topic.FromPbSegment(request.Segment)
  34. // check existing segment locations on filer
  35. existingBrokers, err := broker.checkSegmentOnFiler(segment)
  36. if err != nil {
  37. return ret, err
  38. }
  39. if len(existingBrokers) > 0 {
  40. // good if the segment is still on the brokers
  41. isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
  42. if err != nil {
  43. return ret, err
  44. }
  45. if isActive {
  46. for _, broker := range existingBrokers {
  47. ret.Brokers = append(ret.Brokers, string(broker))
  48. }
  49. return ret, nil
  50. }
  51. }
  52. // randomly pick up to 10 brokers, and find the ones with the lightest load
  53. selectedBrokers, err := broker.selectBrokers()
  54. if err != nil {
  55. return ret, err
  56. }
  57. // save the allocated brokers info for this segment on the filer
  58. if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
  59. return ret, err
  60. }
  61. for _, broker := range selectedBrokers {
  62. ret.Brokers = append(ret.Brokers, string(broker))
  63. }
  64. return ret, nil
  65. }
  66. func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
  67. ret := &mq_pb.CheckSegmentStatusResponse{}
  68. // TODO add in memory active segment
  69. return ret, nil
  70. }
  71. func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
  72. ret := &mq_pb.CheckBrokerLoadResponse{}
  73. // TODO read broker's load
  74. return ret, nil
  75. }
  76. // createOrUpdateTopicPartitions creates the topic partitions on the broker
  77. // 1. check
  78. func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) {
  79. // create or update each partition
  80. if prevAssignments == nil {
  81. broker.createOrUpdateTopicPartition(topic, nil)
  82. } else {
  83. for _, brokerPartitionAssignment := range prevAssignments {
  84. broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment)
  85. }
  86. }
  87. return nil
  88. }
  89. func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) {
  90. shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
  91. if !shouldCreate {
  92. }
  93. return
  94. }
  95. func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) {
  96. if oldAssignment == nil {
  97. return true
  98. }
  99. for _, b := range oldAssignment.FollowerBrokers {
  100. pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  101. _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
  102. Namespace: string(topic.Namespace),
  103. Topic: topic.Name,
  104. BrokerPartitionAssignment: oldAssignment,
  105. ShouldCancelIfNotMatch: true,
  106. })
  107. if err != nil {
  108. shouldCreate = true
  109. }
  110. return nil
  111. })
  112. }
  113. return
  114. }
  115. func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
  116. var wg sync.WaitGroup
  117. for _, candidate := range brokers {
  118. wg.Add(1)
  119. go func(candidate pb.ServerAddress) {
  120. defer wg.Done()
  121. broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  122. resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
  123. Segment: &mq_pb.Segment{
  124. Namespace: string(segment.Topic.Namespace),
  125. Topic: segment.Topic.Name,
  126. Id: segment.Id,
  127. },
  128. })
  129. if checkErr != nil {
  130. err = checkErr
  131. glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
  132. return nil
  133. }
  134. if resp.IsActive == false {
  135. active = false
  136. }
  137. return nil
  138. })
  139. }(candidate)
  140. }
  141. wg.Wait()
  142. return
  143. }
  144. func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
  145. candidates, err := broker.selectCandidatesFromMaster(10)
  146. if err != nil {
  147. return
  148. }
  149. brokers, err = broker.pickLightestCandidates(candidates, 3)
  150. return
  151. }
  152. func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
  153. err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  154. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  155. ClientType: cluster.BrokerType,
  156. FilerGroup: broker.option.FilerGroup,
  157. Limit: limit,
  158. })
  159. if err != nil {
  160. return err
  161. }
  162. if len(resp.ClusterNodes) == 0 {
  163. return nil
  164. }
  165. for _, node := range resp.ClusterNodes {
  166. candidates = append(candidates, pb.ServerAddress(node.Address))
  167. }
  168. return nil
  169. })
  170. return
  171. }
  172. type CandidateStatus struct {
  173. address pb.ServerAddress
  174. messageCount int64
  175. bytesCount int64
  176. load int64
  177. }
  178. func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
  179. if len(candidates) <= limit {
  180. return candidates, nil
  181. }
  182. candidateStatuses, err := broker.checkBrokerStatus(candidates)
  183. if err != nil {
  184. return nil, err
  185. }
  186. sort.Slice(candidateStatuses, func(i, j int) bool {
  187. return candidateStatuses[i].load < candidateStatuses[j].load
  188. })
  189. for i, candidate := range candidateStatuses {
  190. if i >= limit {
  191. break
  192. }
  193. selected = append(selected, candidate.address)
  194. }
  195. return
  196. }
  197. func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
  198. candidateStatuses = make([]*CandidateStatus, len(candidates))
  199. var wg sync.WaitGroup
  200. for i, candidate := range candidates {
  201. wg.Add(1)
  202. go func(i int, candidate pb.ServerAddress) {
  203. defer wg.Done()
  204. err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
  205. resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
  206. if checkErr != nil {
  207. err = checkErr
  208. return err
  209. }
  210. candidateStatuses[i] = &CandidateStatus{
  211. address: candidate,
  212. messageCount: resp.MessageCount,
  213. bytesCount: resp.BytesCount,
  214. load: resp.MessageCount + resp.BytesCount/(64*1024),
  215. }
  216. return nil
  217. })
  218. }(i, candidate)
  219. }
  220. wg.Wait()
  221. return
  222. }