bench_filer_upload.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "math/rand"
  9. "mime/multipart"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. size = flag.Int("size", 1024, "file size")
  18. concurrency = flag.Int("c", 4, "concurrent number of uploads")
  19. times = flag.Int("n", 1024, "repeated number of times")
  20. fileCount = flag.Int("fileCount", 1, "number of files to write")
  21. destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
  22. statsChan = make(chan stat, 8)
  23. )
  24. type stat struct {
  25. size int64
  26. }
  27. func main() {
  28. flag.Parse()
  29. data := make([]byte, *size)
  30. println("data len", len(data))
  31. var wg sync.WaitGroup
  32. for x := 0; x < *concurrency; x++ {
  33. wg.Add(1)
  34. go func(x int) {
  35. defer wg.Done()
  36. client := &http.Client{Transport: &http.Transport{
  37. MaxIdleConns: 1024,
  38. MaxIdleConnsPerHost: 1024,
  39. }}
  40. r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x)))
  41. for t := 0; t < *times; t++ {
  42. for f := 0; f < *fileCount; f++ {
  43. fn := r.Intn(*fileCount)
  44. if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil {
  45. statsChan <- stat{
  46. size: size,
  47. }
  48. } else {
  49. log.Fatalf("client %d upload %d times: %v", x, t, err)
  50. }
  51. }
  52. }
  53. }(x)
  54. }
  55. go func() {
  56. ticker := time.NewTicker(1000 * time.Millisecond)
  57. defer ticker.Stop()
  58. var lastTime time.Time
  59. var counter, size int64
  60. for {
  61. select {
  62. case stat := <-statsChan:
  63. size += stat.size
  64. counter++
  65. case x := <-ticker.C:
  66. if !lastTime.IsZero() {
  67. elapsed := x.Sub(lastTime).Seconds()
  68. fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
  69. float64(counter)/elapsed,
  70. float64(size/1024/1024)/elapsed)
  71. }
  72. lastTime = x
  73. size = 0
  74. counter = 0
  75. }
  76. }
  77. }()
  78. wg.Wait()
  79. }
  80. func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) {
  81. if !strings.HasSuffix(destination, "/") {
  82. destination = destination + "/"
  83. }
  84. body := &bytes.Buffer{}
  85. writer := multipart.NewWriter(body)
  86. part, err := writer.CreateFormFile("file", filename)
  87. if err != nil {
  88. return 0, fmt.Errorf("fail to create form %v: %v", filename, err)
  89. }
  90. part.Write(data)
  91. err = writer.Close()
  92. if err != nil {
  93. return 0, fmt.Errorf("fail to write part %v: %v", filename, err)
  94. }
  95. uri := destination + filename
  96. request, err := http.NewRequest(http.MethodPost, uri, body)
  97. request.Header.Set("Content-Type", writer.FormDataContentType())
  98. // request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also.
  99. resp, err := client.Do(request)
  100. if err != nil {
  101. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  102. } else {
  103. body := &bytes.Buffer{}
  104. _, err := body.ReadFrom(resp.Body)
  105. if err != nil {
  106. return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
  107. }
  108. io.Copy(io.Discard, resp.Body)
  109. resp.Body.Close()
  110. }
  111. return int64(len(data)), nil
  112. }