123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package segment
- import (
- flatbuffers "github.com/google/flatbuffers/go"
- "github.com/seaweedfs/seaweedfs/weed/pb/message_fbs"
- )
- type MessageBatchBuilder struct {
- b *flatbuffers.Builder
- producerId int32
- producerEpoch int32
- segmentId int32
- flags int32
- messageOffsets []flatbuffers.UOffsetT
- segmentSeqBase int64
- segmentSeqLast int64
- tsMsBase int64
- tsMsLast int64
- }
- func NewMessageBatchBuilder(b *flatbuffers.Builder,
- producerId int32,
- producerEpoch int32,
- segmentId int32,
- flags int32) *MessageBatchBuilder {
- b.Reset()
- return &MessageBatchBuilder{
- b: b,
- producerId: producerId,
- producerEpoch: producerEpoch,
- segmentId: segmentId,
- flags: flags,
- }
- }
- func (builder *MessageBatchBuilder) AddMessage(segmentSeq int64, tsMs int64, properties map[string][]byte, key []byte, value []byte) {
- if builder.segmentSeqBase == 0 {
- builder.segmentSeqBase = segmentSeq
- }
- builder.segmentSeqLast = segmentSeq
- if builder.tsMsBase == 0 {
- builder.tsMsBase = tsMs
- }
- builder.tsMsLast = tsMs
- var names, values, pairs []flatbuffers.UOffsetT
- for k, v := range properties {
- names = append(names, builder.b.CreateString(k))
- values = append(values, builder.b.CreateByteVector(v))
- }
- for i, _ := range names {
- message_fbs.NameValueStart(builder.b)
- message_fbs.NameValueAddName(builder.b, names[i])
- message_fbs.NameValueAddValue(builder.b, values[i])
- pair := message_fbs.NameValueEnd(builder.b)
- pairs = append(pairs, pair)
- }
- message_fbs.MessageStartPropertiesVector(builder.b, len(properties))
- for i := len(pairs) - 1; i >= 0; i-- {
- builder.b.PrependUOffsetT(pairs[i])
- }
- propOffset := builder.b.EndVector(len(properties))
- keyOffset := builder.b.CreateByteVector(key)
- valueOffset := builder.b.CreateByteVector(value)
- message_fbs.MessageStart(builder.b)
- message_fbs.MessageAddSeqDelta(builder.b, int32(segmentSeq-builder.segmentSeqBase))
- message_fbs.MessageAddTsMsDelta(builder.b, int32(tsMs-builder.tsMsBase))
- message_fbs.MessageAddProperties(builder.b, propOffset)
- message_fbs.MessageAddKey(builder.b, keyOffset)
- message_fbs.MessageAddData(builder.b, valueOffset)
- messageOffset := message_fbs.MessageEnd(builder.b)
- builder.messageOffsets = append(builder.messageOffsets, messageOffset)
- }
- func (builder *MessageBatchBuilder) BuildMessageBatch() {
- message_fbs.MessageBatchStartMessagesVector(builder.b, len(builder.messageOffsets))
- for i := len(builder.messageOffsets) - 1; i >= 0; i-- {
- builder.b.PrependUOffsetT(builder.messageOffsets[i])
- }
- messagesOffset := builder.b.EndVector(len(builder.messageOffsets))
- message_fbs.MessageBatchStart(builder.b)
- message_fbs.MessageBatchAddProducerId(builder.b, builder.producerId)
- message_fbs.MessageBatchAddProducerEpoch(builder.b, builder.producerEpoch)
- message_fbs.MessageBatchAddSegmentId(builder.b, builder.segmentId)
- message_fbs.MessageBatchAddFlags(builder.b, builder.flags)
- message_fbs.MessageBatchAddSegmentSeqBase(builder.b, builder.segmentSeqBase)
- message_fbs.MessageBatchAddSegmentSeqMaxDelta(builder.b, int32(builder.segmentSeqLast-builder.segmentSeqBase))
- message_fbs.MessageBatchAddTsMsBase(builder.b, builder.tsMsBase)
- message_fbs.MessageBatchAddTsMsMaxDelta(builder.b, int32(builder.tsMsLast-builder.tsMsBase))
- message_fbs.MessageBatchAddMessages(builder.b, messagesOffset)
- messageBatch := message_fbs.MessageBatchEnd(builder.b)
- builder.b.Finish(messageBatch)
- }
- func (builder *MessageBatchBuilder) GetBytes() []byte {
- return builder.b.FinishedBytes()
- }
|