broker_grpc_configure.go 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/status"
  13. )
  14. // ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
  15. // It generates an assignments based on existing allocations,
  16. // and then assign the partitions to the brokers.
  17. func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
  18. if !b.isLockOwner() {
  19. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  20. resp, err = client.ConfigureTopic(ctx, request)
  21. return nil
  22. })
  23. if proxyErr != nil {
  24. return nil, proxyErr
  25. }
  26. return resp, err
  27. }
  28. // validate the schema
  29. if request.RecordType != nil {
  30. if _, err = schema.NewSchema(request.RecordType); err != nil {
  31. return nil, status.Errorf(codes.InvalidArgument, "invalid record type %+v: %v", request.RecordType, err)
  32. }
  33. }
  34. t := topic.FromPbTopic(request.Topic)
  35. var readErr, assignErr error
  36. resp, readErr = b.fca.ReadTopicConfFromFiler(t)
  37. if readErr != nil {
  38. glog.V(0).Infof("read topic %s conf: %v", request.Topic, readErr)
  39. }
  40. if resp != nil {
  41. assignErr = b.ensureTopicActiveAssignments(t, resp)
  42. // no need to assign directly.
  43. // The added or updated assignees will read from filer directly.
  44. // The gone assignees will die by themselves.
  45. }
  46. if readErr == nil && assignErr == nil && len(resp.BrokerPartitionAssignments) == int(request.PartitionCount) {
  47. glog.V(0).Infof("existing topic partitions %d: %+v", len(resp.BrokerPartitionAssignments), resp.BrokerPartitionAssignments)
  48. return
  49. }
  50. if resp != nil && len(resp.BrokerPartitionAssignments) > 0 {
  51. if cancelErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, false); cancelErr != nil {
  52. glog.V(1).Infof("cancel old topic %s partitions assignments %v : %v", request.Topic, resp.BrokerPartitionAssignments, cancelErr)
  53. }
  54. }
  55. resp = &mq_pb.ConfigureTopicResponse{}
  56. if b.PubBalancer.Brokers.IsEmpty() {
  57. return nil, status.Errorf(codes.Unavailable, pub_balancer.ErrNoBroker.Error())
  58. }
  59. resp.BrokerPartitionAssignments = pub_balancer.AllocateTopicPartitions(b.PubBalancer.Brokers, request.PartitionCount)
  60. resp.RecordType = request.RecordType
  61. // save the topic configuration on filer
  62. if err := b.fca.SaveTopicConfToFiler(request.Topic, resp); err != nil {
  63. return nil, fmt.Errorf("configure topic: %v", err)
  64. }
  65. b.PubBalancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
  66. glog.V(0).Infof("ConfigureTopic: topic %s partition assignments: %v", request.Topic, resp.BrokerPartitionAssignments)
  67. return resp, err
  68. }