bench_filer_upload.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package main
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "math/rand"
  10. "mime/multipart"
  11. "net/http"
  12. "os"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. size = flag.Int("size", 1024, "file size")
  19. concurrency = flag.Int("c", 4, "concurrent number of uploads")
  20. times = flag.Int("n", 1024, "repeated number of times")
  21. fileCount = flag.Int("fileCount", 1, "number of files to write")
  22. destination = flag.String("to", "http://localhost:8888/", "destination directory on filer")
  23. statsChan = make(chan stat, 8)
  24. )
  25. type stat struct {
  26. size int64
  27. }
  28. func main() {
  29. flag.Parse()
  30. data := make([]byte, *size)
  31. println("data len", len(data))
  32. var wg sync.WaitGroup
  33. for x := 0; x < *concurrency; x++ {
  34. wg.Add(1)
  35. go func(x int) {
  36. defer wg.Done()
  37. client := &http.Client{Transport: &http.Transport{
  38. MaxConnsPerHost: 1024,
  39. MaxIdleConnsPerHost: 1024,
  40. }}
  41. r := rand.New(rand.NewSource(time.Now().UnixNano() + int64(x)))
  42. for t := 0; t < *times; t++ {
  43. for f := 0; f < *fileCount; f++ {
  44. fn := r.Intn(*fileCount)
  45. if size, err := uploadFileToFiler(client, data, fmt.Sprintf("file%04d", fn), *destination); err == nil {
  46. statsChan <- stat{
  47. size: size,
  48. }
  49. } else {
  50. log.Fatalf("client %d upload %d times: %v", x, t, err)
  51. }
  52. }
  53. }
  54. }(x)
  55. }
  56. go func() {
  57. ticker := time.NewTicker(1000 * time.Millisecond)
  58. var lastTime time.Time
  59. var counter, size int64
  60. for {
  61. select {
  62. case stat := <-statsChan:
  63. size += stat.size
  64. counter++
  65. case x := <-ticker.C:
  66. if !lastTime.IsZero() {
  67. elapsed := x.Sub(lastTime).Seconds()
  68. fmt.Fprintf(os.Stdout, "%.2f files/s, %.2f MB/s\n",
  69. float64(counter)/elapsed,
  70. float64(size/1024/1024)/elapsed)
  71. }
  72. lastTime = x
  73. size = 0
  74. counter = 0
  75. }
  76. }
  77. }()
  78. wg.Wait()
  79. }
  80. func uploadFileToFiler(client *http.Client, data []byte, filename, destination string) (size int64, err error) {
  81. if !strings.HasSuffix(destination, "/") {
  82. destination = destination + "/"
  83. }
  84. body := &bytes.Buffer{}
  85. writer := multipart.NewWriter(body)
  86. part, err := writer.CreateFormFile("file", filename)
  87. if err != nil {
  88. return 0, fmt.Errorf("fail to create form %v: %v", filename, err)
  89. }
  90. part.Write(data)
  91. err = writer.Close()
  92. if err != nil {
  93. return 0, fmt.Errorf("fail to write part %v: %v", filename, err)
  94. }
  95. uri := destination + filename
  96. request, err := http.NewRequest("POST", uri, body)
  97. request.Header.Set("Content-Type", writer.FormDataContentType())
  98. // request.Close = true // can not use this, which do not reuse http connection, impacting filer->volume also.
  99. resp, err := client.Do(request)
  100. if err != nil {
  101. return 0, fmt.Errorf("http POST %s: %v", uri, err)
  102. } else {
  103. body := &bytes.Buffer{}
  104. _, err := body.ReadFrom(resp.Body)
  105. if err != nil {
  106. return 0, fmt.Errorf("read http POST %s response: %v", uri, err)
  107. }
  108. io.Copy(ioutil.Discard, resp.Body)
  109. resp.Body.Close()
  110. }
  111. return int64(len(data)), nil
  112. }