broker_segment_serde.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package broker
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  10. jsonpb "google.golang.org/protobuf/encoding/protojson"
  11. "time"
  12. )
  13. func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
  14. info, found, err := broker.readSegmentInfoOnFiler(segment)
  15. if err != nil {
  16. return
  17. }
  18. if !found {
  19. return
  20. }
  21. for _, b := range info.Brokers {
  22. brokers = append(brokers, pb.ServerAddress(b))
  23. }
  24. return
  25. }
  26. func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
  27. var nodes []string
  28. for _, b := range brokers {
  29. nodes = append(nodes, string(b))
  30. }
  31. broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
  32. Segment: segment.ToPbSegment(),
  33. StartTsNs: time.Now().UnixNano(),
  34. Brokers: nodes,
  35. StopTsNs: 0,
  36. PreviousSegments: nil,
  37. NextSegments: nil,
  38. })
  39. return
  40. }
  41. func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
  42. dir, name := segment.DirAndName()
  43. found, err = filer_pb.Exists(broker, dir, name, false)
  44. if !found || err != nil {
  45. return
  46. }
  47. err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  48. // read filer conf first
  49. data, err := filer.ReadInsideFiler(client, dir, name)
  50. if err != nil {
  51. return fmt.Errorf("ReadEntry: %v", err)
  52. }
  53. // parse into filer conf object
  54. info = &mq_pb.SegmentInfo{}
  55. if err = jsonpb.Unmarshal(data, info); err != nil {
  56. return err
  57. }
  58. found = true
  59. return nil
  60. })
  61. return
  62. }
  63. func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
  64. dir, name := segment.DirAndName()
  65. var buf bytes.Buffer
  66. filer.ProtoToText(&buf, info)
  67. err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  68. // read filer conf first
  69. err := filer.SaveInsideFiler(client, dir, name, buf.Bytes())
  70. if err != nil {
  71. return fmt.Errorf("save segment info: %v", err)
  72. }
  73. return nil
  74. })
  75. return
  76. }