stress_filer_upload.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "math/rand"
  9. "mime/multipart"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "time"
  16. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  17. )
  18. var (
  19. dir = flag.String("dir", ".", "upload files under this directory")
  20. concurrency = flag.Int("c", 1, "concurrent number of uploads")
  21. times = flag.Int("n", 1, "repeated number of times")
  22. destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
  23. statsChan = make(chan stat, 8)
  24. )
  25. type stat struct {
  26. size int64
  27. }
  28. func main() {
  29. flag.Parse()
  30. util_http.InitGlobalHttpClient()
  31. var fileNames []string
  32. files, err := os.ReadDir(*dir)
  33. if err != nil {
  34. log.Fatalf("fail to read dir %v: %v", *dir, err)
  35. }
  36. for _, file := range files {
  37. if file.IsDir() {
  38. continue
  39. }
  40. fileNames = append(fileNames, filepath.Join(*dir, file.Name()))
  41. }
  42. var wg sync.WaitGroup
  43. for x := 0; x < *concurrency; x++ {
  44. wg.Add(1)
  45. go func() {
  46. defer wg.Done()
  47. rand.Shuffle(len(fileNames), func(i, j int) {
  48. fileNames[i], fileNames[j] = fileNames[j], fileNames[i]
  49. })
  50. for t := 0; t < *times; t++ {
  51. for _, filename := range fileNames {
  52. if size, err := uploadFileToFiler(filename, *destination); err == nil {
  53. statsChan <- stat{
  54. size: size,
  55. }
  56. }
  57. }
  58. }
  59. }()
  60. }
  61. go func() {
  62. ticker := time.NewTicker(500 * time.Millisecond)
  63. defer ticker.Stop()
  64. var lastTime time.Time
  65. var counter, size int64
  66. for {
  67. select {
  68. case stat := <-statsChan:
  69. size += stat.size
  70. counter++
  71. case x := <-ticker.C:
  72. if !lastTime.IsZero() {
  73. elapsed := x.Sub(lastTime).Seconds()
  74. fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
  75. float64(counter)/elapsed,
  76. float64(size/1024/1024)/elapsed)
  77. }
  78. lastTime = x
  79. size = 0
  80. counter = 0
  81. }
  82. }
  83. }()
  84. wg.Wait()
  85. }
  86. func uploadFileToFiler(filename, destination string) (size int64, err error) {
  87. file, err := os.Open(filename)
  88. if err != nil {
  89. panic(err)
  90. }
  91. defer file.Close()
  92. fi, err := file.Stat()
  93. if !strings.HasSuffix(destination, "/") {
  94. destination = destination + "/"
  95. }
  96. body := &bytes.Buffer{}
  97. writer := multipart.NewWriter(body)
  98. part, err := writer.CreateFormFile("file", file.Name())
  99. if err != nil {
  100. return 0, fmt.Errorf("fail to create form %v: %v", file.Name(), err)
  101. }
  102. _, err = io.Copy(part, file)
  103. if err != nil {
  104. return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
  105. }
  106. err = writer.Close()
  107. if err != nil {
  108. return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
  109. }
  110. uri := destination + file.Name()
  111. request, err := http.NewRequest(http.MethodPost, uri, body)
  112. if err != nil {
  113. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  114. }
  115. request.Header.Set("Content-Type", writer.FormDataContentType())
  116. resp, err := util_http.GetGlobalHttpClient().Do(request)
  117. if err != nil {
  118. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  119. } else {
  120. body := &bytes.Buffer{}
  121. _, err := body.ReadFrom(resp.Body)
  122. if err != nil {
  123. return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
  124. }
  125. io.Copy(io.Discard, resp.Body)
  126. resp.Body.Close()
  127. }
  128. return fi.Size(), nil
  129. }