load_test_meta_tail.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "google.golang.org/grpc"
  9. "google.golang.org/grpc/credentials/insecure"
  10. "strconv"
  11. "time"
  12. )
  13. var (
  14. dir = flag.String("dir", "/tmp", "directory to create files")
  15. n = flag.Int("n", 100, "the number of metadata")
  16. tailFiler = flag.String("filer", "localhost:8888", "the filer address")
  17. isWrite = flag.Bool("write", false, "only write")
  18. writeInterval = flag.Duration("writeInterval", 0, "write interval, e.g., 1s")
  19. )
  20. func main() {
  21. flag.Parse()
  22. if *isWrite {
  23. startGenerateMetadata()
  24. return
  25. }
  26. expected := 0
  27. startSubscribeMetadata(func(event *filer_pb.SubscribeMetadataResponse) error {
  28. if event.Directory != *dir {
  29. return nil
  30. }
  31. name := event.EventNotification.NewEntry.Name
  32. glog.V(0).Infof("=> %s ts:%+v", name, time.Unix(0, event.TsNs))
  33. id := name[4:]
  34. if x, err := strconv.Atoi(id); err == nil {
  35. if x != expected {
  36. return fmt.Errorf("Expected file%d Actual %s\n", expected, name)
  37. }
  38. expected++
  39. } else {
  40. return err
  41. }
  42. time.Sleep(10 * time.Millisecond)
  43. return nil
  44. })
  45. }
  46. func startGenerateMetadata() {
  47. pb.WithFilerClient(false, pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), func(client filer_pb.SeaweedFilerClient) error {
  48. for i := 0; i < *n; i++ {
  49. name := fmt.Sprintf("file%d", i)
  50. glog.V(0).Infof("write %s/%s", *dir, name)
  51. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  52. Directory: *dir,
  53. Entry: &filer_pb.Entry{
  54. Name: name,
  55. },
  56. }); err != nil {
  57. fmt.Printf("create entry %s: %v\n", name, err)
  58. return err
  59. }
  60. if *writeInterval > 0 {
  61. time.Sleep(*writeInterval)
  62. }
  63. }
  64. return nil
  65. })
  66. }
  67. func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
  68. tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), "tail", 0, 0, *dir, nil, 0, 0, 0, eachEntryFunc, pb.TrivialOnError)
  69. if tailErr != nil {
  70. fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
  71. }
  72. }