load_test_meta_tail.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials/insecure"
  11. "strconv"
  12. "time"
  13. )
  14. var (
  15. dir = flag.String("dir", "/tmp", "directory to create files")
  16. n = flag.Int("n", 100, "the number of metadata")
  17. tailFiler = flag.String("filer", "localhost:8888", "the filer address")
  18. isWrite = flag.Bool("write", false, "only write")
  19. writeInterval = flag.Duration("writeInterval", 0, "write interval, e.g., 1s")
  20. )
  21. func main() {
  22. flag.Parse()
  23. if *isWrite {
  24. startGenerateMetadata()
  25. return
  26. }
  27. expected := 0
  28. startSubscribeMetadata(func(event *filer_pb.SubscribeMetadataResponse) error {
  29. if event.Directory != *dir {
  30. return nil
  31. }
  32. name := event.EventNotification.NewEntry.Name
  33. glog.V(0).Infof("=> %s ts:%+v", name, time.Unix(0, event.TsNs))
  34. id := name[4:]
  35. if x, err := strconv.Atoi(id); err == nil {
  36. if x != expected {
  37. return fmt.Errorf("Expected file%d Actual %s\n", expected, name)
  38. }
  39. expected++
  40. } else {
  41. return err
  42. }
  43. time.Sleep(10 * time.Millisecond)
  44. return nil
  45. })
  46. }
  47. func startGenerateMetadata() {
  48. pb.WithFilerClient(false, util.RandomInt32(), pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), func(client filer_pb.SeaweedFilerClient) error {
  49. for i := 0; i < *n; i++ {
  50. name := fmt.Sprintf("file%d", i)
  51. glog.V(0).Infof("write %s/%s", *dir, name)
  52. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  53. Directory: *dir,
  54. Entry: &filer_pb.Entry{
  55. Name: name,
  56. },
  57. }); err != nil {
  58. fmt.Printf("create entry %s: %v\n", name, err)
  59. return err
  60. }
  61. if *writeInterval > 0 {
  62. time.Sleep(*writeInterval)
  63. }
  64. }
  65. return nil
  66. })
  67. }
  68. func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
  69. metadataFollowOption := &pb.MetadataFollowOption{
  70. ClientName: "tail",
  71. ClientId: 0,
  72. ClientEpoch: 0,
  73. SelfSignature: 0,
  74. PathPrefix: *dir,
  75. AdditionalPathPrefixes: nil,
  76. DirectoriesToWatch: nil,
  77. StartTsNs: 0,
  78. StopTsNs: 0,
  79. EventErrorType: pb.TrivialOnError,
  80. }
  81. tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpc.WithTransportCredentials(insecure.NewCredentials()), metadataFollowOption, eachEntryFunc)
  82. if tailErr != nil {
  83. fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
  84. }
  85. }