config.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package msgclient
  2. import (
  3. "context"
  4. "log"
  5. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  8. )
  9. func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
  10. return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
  11. _, err := client.ConfigureTopic(context.Background(),
  12. &messaging_pb.ConfigureTopicRequest{
  13. Namespace: tp.Namespace,
  14. Topic: tp.Topic,
  15. Configuration: &messaging_pb.TopicConfiguration{
  16. PartitionCount: 0,
  17. Collection: "",
  18. Replication: "",
  19. IsTransient: false,
  20. Partitoning: 0,
  21. },
  22. })
  23. return err
  24. })
  25. }
  26. func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
  27. return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
  28. _, err := client.DeleteTopic(context.Background(),
  29. &messaging_pb.DeleteTopicRequest{
  30. Namespace: namespace,
  31. Topic: topic,
  32. })
  33. return err
  34. })
  35. }
  36. func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
  37. var lastErr error
  38. for _, broker := range mc.bootstrapBrokers {
  39. grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
  40. if err != nil {
  41. log.Printf("dial broker %s: %v", broker, err)
  42. continue
  43. }
  44. defer grpcConnection.Close()
  45. err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
  46. if err == nil {
  47. return nil
  48. }
  49. lastErr = err
  50. }
  51. return lastErr
  52. }