stress_filer_upload.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "math/rand"
  10. "mime/multipart"
  11. "net/http"
  12. "os"
  13. "path/filepath"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var (
  19. dir = flag.String("dir", ".", "upload files under this directory")
  20. concurrency = flag.Int("c", 1, "concurrent number of uploads")
  21. times = flag.Int("n", 1, "repeated number of times")
  22. destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
  23. statsChan = make(chan stat, 8)
  24. )
  25. type stat struct {
  26. size int64
  27. }
  28. func main() {
  29. flag.Parse()
  30. var fileNames []string
  31. files, err := ioutil.ReadDir(*dir)
  32. if err != nil {
  33. log.Fatalf("fail to read dir %v: %v", *dir, err)
  34. }
  35. for _, file := range files {
  36. if file.IsDir() {
  37. continue
  38. }
  39. fileNames = append(fileNames, filepath.Join(*dir, file.Name()))
  40. }
  41. var wg sync.WaitGroup
  42. for x := 0; x < *concurrency; x++ {
  43. wg.Add(1)
  44. client := &http.Client{}
  45. go func() {
  46. defer wg.Done()
  47. rand.Shuffle(len(fileNames), func(i, j int) {
  48. fileNames[i], fileNames[j] = fileNames[j], fileNames[i]
  49. })
  50. for t := 0; t < *times; t++ {
  51. for _, filename := range fileNames {
  52. if size, err := uploadFileToFiler(client, filename, *destination); err == nil {
  53. statsChan <- stat{
  54. size: size,
  55. }
  56. }
  57. }
  58. }
  59. }()
  60. }
  61. go func() {
  62. ticker := time.NewTicker(500 * time.Millisecond)
  63. var lastTime time.Time
  64. var counter, size int64
  65. for {
  66. select {
  67. case stat := <-statsChan:
  68. size += stat.size
  69. counter++
  70. case x := <-ticker.C:
  71. if !lastTime.IsZero() {
  72. elapsed := x.Sub(lastTime).Seconds()
  73. fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
  74. float64(counter)/elapsed,
  75. float64(size/1024/1024)/elapsed)
  76. }
  77. lastTime = x
  78. size = 0
  79. counter = 0
  80. }
  81. }
  82. }()
  83. wg.Wait()
  84. }
  85. func uploadFileToFiler(client *http.Client, filename, destination string) (size int64, err error) {
  86. file, err := os.Open(filename)
  87. if err != nil {
  88. panic(err)
  89. }
  90. defer file.Close()
  91. fi, err := file.Stat()
  92. if !strings.HasSuffix(destination, "/") {
  93. destination = destination + "/"
  94. }
  95. body := &bytes.Buffer{}
  96. writer := multipart.NewWriter(body)
  97. part, err := writer.CreateFormFile("file", file.Name())
  98. if err != nil {
  99. return 0, fmt.Errorf("fail to create form %v: %v", file.Name(), err)
  100. }
  101. _, err = io.Copy(part, file)
  102. if err != nil {
  103. return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
  104. }
  105. err = writer.Close()
  106. if err != nil {
  107. return 0, fmt.Errorf("fail to write part %v: %v", file.Name(), err)
  108. }
  109. uri := destination + file.Name()
  110. request, err := http.NewRequest("POST", uri, body)
  111. request.Header.Set("Content-Type", writer.FormDataContentType())
  112. resp, err := client.Do(request)
  113. if err != nil {
  114. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  115. } else {
  116. body := &bytes.Buffer{}
  117. _, err := body.ReadFrom(resp.Body)
  118. if err != nil {
  119. return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
  120. }
  121. io.Copy(ioutil.Discard, resp.Body)
  122. resp.Body.Close()
  123. }
  124. return fi.Size(), nil
  125. }