stress_filer_upload.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "math/rand"
  9. "mime/multipart"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. dir = flag.String("dir", ".", "upload files under this directory")
  19. concurrency = flag.Int("c", 1, "concurrent number of uploads")
  20. times = flag.Int("n", 1, "repeated number of times")
  21. destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
  22. statsChan = make(chan stat, 8)
  23. )
  24. type stat struct {
  25. size int64
  26. }
  27. func main() {
  28. flag.Parse()
  29. var fileNames []string
  30. files, err := os.ReadDir(*dir)
  31. if err != nil {
  32. log.Fatalf("fail to read dir %v: %v", *dir, err)
  33. }
  34. for _, file := range files {
  35. if file.IsDir() {
  36. continue
  37. }
  38. fileNames = append(fileNames, filepath.Join(*dir, file.Name()))
  39. }
  40. var wg sync.WaitGroup
  41. for x := 0; x < *concurrency; x++ {
  42. wg.Add(1)
  43. client := &http.Client{}
  44. go func() {
  45. defer wg.Done()
  46. rand.Shuffle(len(fileNames), func(i, j int) {
  47. fileNames[i], fileNames[j] = fileNames[j], fileNames[i]
  48. })
  49. for t := 0; t < *times; t++ {
  50. for _, filename := range fileNames {
  51. if size, err := uploadFileToFiler(client, filename, *destination); err == nil {
  52. statsChan <- stat{
  53. size: size,
  54. }
  55. }
  56. }
  57. }
  58. }()
  59. }
  60. go func() {
  61. ticker := time.NewTicker(500 * time.Millisecond)
  62. defer ticker.Stop()
  63. var lastTime time.Time
  64. var counter, size int64
  65. for {
  66. select {
  67. case stat := <-statsChan:
  68. size += stat.size
  69. counter++
  70. case x := <-ticker.C:
  71. if !lastTime.IsZero() {
  72. elapsed := x.Sub(lastTime).Seconds()
  73. fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
  74. float64(counter)/elapsed,
  75. float64(size/1024/1024)/elapsed)
  76. }
  77. lastTime = x
  78. size = 0
  79. counter = 0
  80. }
  81. }
  82. }()
  83. wg.Wait()
  84. }
  85. func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) {
  86. file, err := os.Open(filename)
  87. if err != nil {
  88. panic(err)
  89. }
  90. defer file.Close()
  91. fi, err := file.Stat()
  92. if !strings.HasSuffix(destination, "/") {
  93. destination = destination + "/"
  94. }
  95. body := &bytes.Buffer{}
  96. writer := multipart.NewWriter(body)
  97. part, err := writer.CreateFormFile("file", file.Name())
  98. if err != nil {
  99. return 0, fmt.Errorf("fail to create form %v: %v", file.Name(), err)
  100. }
  101. _, err = io.Copy(part, file)
  102. if err != nil {
  103. return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
  104. }
  105. err = writer.Close()
  106. if err != nil {
  107. return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
  108. }
  109. uri := destination + file.Name()
  110. request, err := http.NewRequest(http.MethodPost, uri, body)
  111. request.Header.Set("Content-Type", writer.FormDataContentType())
  112. resp, err := client.Do(request)
  113. if err != nil {
  114. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  115. } else {
  116. body := &bytes.Buffer{}
  117. _, err := body.ReadFrom(resp.Body)
  118. if err != nil {
  119. return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
  120. }
  121. io.Copy(io.Discard, resp.Body)
  122. resp.Body.Close()
  123. }
  124. return fi.Size(), nil
  125. }