1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package log_buffer
- import (
- "crypto/rand"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "io"
- "sync"
- "testing"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- )
- func TestNewLogBufferFirstBuffer(t *testing.T) {
- flushInterval := time.Second
- lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
- fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
- }, nil, func() {
- })
- startTime := MessagePosition{Time: time.Now()}
- messageSize := 1024
- messageCount := 5000
- receivedMessageCount := 0
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
- // stop if no more messages
- return receivedMessageCount < messageCount
- }, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
- receivedMessageCount++
- if receivedMessageCount >= messageCount {
- println("processed all messages")
- return true, io.EOF
- }
- return false, nil
- })
- fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)
- fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
- if err != nil && err != io.EOF {
- t.Errorf("unexpected error %v", err)
- }
- }()
- var buf = make([]byte, messageSize)
- for i := 0; i < messageCount; i++ {
- rand.Read(buf)
- lb.AddToBuffer(&mq_pb.DataMessage{
- Key: nil,
- Value: buf,
- TsNs: 0,
- })
- }
- wg.Wait()
- if receivedMessageCount != messageCount {
- t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
- }
- }
|