log_buffer_test.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package log_buffer
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "testing"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. )
  11. func TestNewLogBufferFirstBuffer(t *testing.T) {
  12. flushInterval := time.Second
  13. lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) {
  14. fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
  15. }, func() {
  16. })
  17. startTime := time.Now()
  18. messageSize := 1024
  19. messageCount := 5000
  20. receivedMessageCount := 0
  21. var wg sync.WaitGroup
  22. wg.Add(1)
  23. go func() {
  24. defer wg.Done()
  25. lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
  26. // stop if no more messages
  27. return receivedMessageCount < messageCount
  28. }, func(logEntry *filer_pb.LogEntry) error {
  29. receivedMessageCount++
  30. if receivedMessageCount >= messageCount {
  31. println("processed all messages")
  32. return io.EOF
  33. }
  34. return nil
  35. })
  36. fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)
  37. fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
  38. if err != nil && err != io.EOF {
  39. t.Errorf("unexpected error %v", err)
  40. }
  41. }()
  42. var buf = make([]byte, messageSize)
  43. for i := 0; i < messageCount; i++ {
  44. rand.Read(buf)
  45. lb.AddToBuffer(nil, buf, 0)
  46. }
  47. wg.Wait()
  48. if receivedMessageCount != messageCount {
  49. t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
  50. }
  51. }