bench_filer_upload.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "math/rand"
  9. "mime/multipart"
  10. "net/http"
  11. "os"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. var (
  17. size = flag.Int("size", 1024, "file size")
  18. concurrency = flag.Int("c", 4, "concurrent number of uploads")
  19. times = flag.Int("n", 1024, "repeated number of times")
  20. fileCount = flag.Int("fileCount", 1, "number of files to write")
  21. destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
  22. statsChan = make(chan stat, 8)
  23. )
  24. type stat struct {
  25. size int64
  26. }
  27. func main() {
  28. flag.Parse()
  29. data := make([]byte, *size)
  30. println("data len", len(data))
  31. var wg sync.WaitGroup
  32. for x := 0; x < *concurrency; x++ {
  33. wg.Add(1)
  34. go func(x int) {
  35. defer wg.Done()
  36. client := &http.Client{Transport: &http.Transport{
  37. MaxIdleConns: 1024,
  38. MaxIdleConnsPerHost: 1024,
  39. }}
  40. r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x)))
  41. for t := 0; t < *times; t++ {
  42. for f := 0; f < *fileCount; f++ {
  43. fn := r.Intn(*fileCount)
  44. if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil {
  45. statsChan <- stat{
  46. size: size,
  47. }
  48. } else {
  49. log.Fatalf("client %d upload %d times: %v", x, t, err)
  50. }
  51. }
  52. }
  53. }(x)
  54. }
  55. go func() {
  56. ticker := time.NewTicker(1000 * time.Millisecond)
  57. var lastTime time.Time
  58. var counter, size int64
  59. for {
  60. select {
  61. case stat := <-statsChan:
  62. size += stat.size
  63. counter++
  64. case x := <-ticker.C:
  65. if !lastTime.IsZero() {
  66. elapsed := x.Sub(lastTime).Seconds()
  67. fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
  68. float64(counter)/elapsed,
  69. float64(size/1024/1024)/elapsed)
  70. }
  71. lastTime = x
  72. size = 0
  73. counter = 0
  74. }
  75. }
  76. }()
  77. wg.Wait()
  78. }
  79. func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) {
  80. if !strings.HasSuffix(destination, "/") {
  81. destination = destination + "/"
  82. }
  83. body := &bytes.Buffer{}
  84. writer := multipart.NewWriter(body)
  85. part, err := writer.CreateFormFile("file", filename)
  86. if err != nil {
  87. return 0, fmt.Errorf("fail to create form %v: %v", filename, err)
  88. }
  89. part.Write(data)
  90. err = writer.Close()
  91. if err != nil {
  92. return 0, fmt.Errorf("fail to write part %v: %v", filename, err)
  93. }
  94. uri := destination + filename
  95. request, err := http.NewRequest("POST", uri, body)
  96. request.Header.Set("Content-Type", writer.FormDataContentType())
  97. // request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also.
  98. resp, err := client.Do(request)
  99. if err != nil {
  100. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  101. } else {
  102. body := &bytes.Buffer{}
  103. _, err := body.ReadFrom(resp.Body)
  104. if err != nil {
  105. return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
  106. }
  107. io.Copy(io.Discard, resp.Body)
  108. resp.Body.Close()
  109. }
  110. return int64(len(data)), nil
  111. }