log_buffer_test.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package log_buffer
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "io"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. )
  12. func TestNewLogBufferFirstBuffer(t *testing.T) {
  13. flushInterval := time.Second
  14. lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
  15. fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
  16. }, nil, func() {
  17. })
  18. startTime := MessagePosition{Time: time.Now()}
  19. messageSize := 1024
  20. messageCount := 5000
  21. receivedMessageCount := 0
  22. var wg sync.WaitGroup
  23. wg.Add(1)
  24. go func() {
  25. defer wg.Done()
  26. lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
  27. // stop if no more messages
  28. return receivedMessageCount < messageCount
  29. }, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
  30. receivedMessageCount++
  31. if receivedMessageCount >= messageCount {
  32. println("processed all messages")
  33. return true, io.EOF
  34. }
  35. return false, nil
  36. })
  37. fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)
  38. fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
  39. if err != nil && err != io.EOF {
  40. t.Errorf("unexpected error %v", err)
  41. }
  42. }()
  43. var buf = make([]byte, messageSize)
  44. for i := 0; i < messageCount; i++ {
  45. rand.Read(buf)
  46. lb.AddToBuffer(&mq_pb.DataMessage{
  47. Key: nil,
  48. Value: buf,
  49. TsNs: 0,
  50. })
  51. }
  52. wg.Wait()
  53. if receivedMessageCount != messageCount {
  54. t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
  55. }
  56. }