benchmark.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. package command
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "io"
  7. "math"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "runtime"
  12. "runtime/pprof"
  13. "sort"
  14. "strings"
  15. "sync"
  16. "time"
  17. "google.golang.org/grpc"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/operation"
  20. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  21. "github.com/chrislusf/seaweedfs/weed/security"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. "github.com/chrislusf/seaweedfs/weed/wdclient"
  24. )
  25. type BenchmarkOptions struct {
  26. masters *string
  27. concurrency *int
  28. numberOfFiles *int
  29. fileSize *int
  30. idListFile *string
  31. write *bool
  32. deletePercentage *int
  33. read *bool
  34. sequentialRead *bool
  35. collection *string
  36. replication *string
  37. cpuprofile *string
  38. maxCpu *int
  39. grpcDialOption grpc.DialOption
  40. masterClient *wdclient.MasterClient
  41. readByGrpc *bool
  42. readByTcp *bool
  43. }
  44. var (
  45. b BenchmarkOptions
  46. sharedBytes []byte
  47. isSecure bool
  48. )
  49. func init() {
  50. cmdBenchmark.Run = runBenchmark // break init cycle
  51. cmdBenchmark.IsDebug = cmdBenchmark.Flag.Bool("debug", false, "verbose debug information")
  52. b.masters = cmdBenchmark.Flag.String("master", "localhost:9333", "SeaweedFS master location")
  53. b.concurrency = cmdBenchmark.Flag.Int("c", 16, "number of concurrent write or read processes")
  54. b.fileSize = cmdBenchmark.Flag.Int("size", 1024, "simulated file size in bytes, with random(0~63) bytes padding")
  55. b.numberOfFiles = cmdBenchmark.Flag.Int("n", 1024*1024, "number of files to write for each thread")
  56. b.idListFile = cmdBenchmark.Flag.String("list", os.TempDir()+"/benchmark_list.txt", "list of uploaded file ids")
  57. b.write = cmdBenchmark.Flag.Bool("write", true, "enable write")
  58. b.deletePercentage = cmdBenchmark.Flag.Int("deletePercent", 0, "the percent of writes that are deletes")
  59. b.read = cmdBenchmark.Flag.Bool("read", true, "enable read")
  60. b.sequentialRead = cmdBenchmark.Flag.Bool("readSequentially", false, "randomly read by ids from \"-list\" specified file")
  61. b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
  62. b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
  63. b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
  64. b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
  65. b.readByGrpc = cmdBenchmark.Flag.Bool("read.grpc", false, "use grpc API to read")
  66. b.readByTcp = cmdBenchmark.Flag.Bool("read.tcp", false, "use tcp API to read")
  67. sharedBytes = make([]byte, 1024)
  68. }
  69. var cmdBenchmark = &Command{
  70. UsageLine: "benchmark -master=localhost:9333 -c=10 -n=100000",
  71. Short: "benchmark on writing millions of files and read out",
  72. Long: `benchmark on an empty SeaweedFS file system.
  73. Two tests during benchmark:
  74. 1) write lots of small files to the system
  75. 2) read the files out
  76. The file content is mostly zero, but no compression is done.
  77. You can choose to only benchmark read or write.
  78. During write, the list of uploaded file ids is stored in "-list" specified file.
  79. You can also use your own list of file ids to run read test.
  80. Write speed and read speed will be collected.
  81. The numbers are used to get a sense of the system.
  82. Usually your network or the hard drive is the real bottleneck.
  83. Another thing to watch is whether the volumes are evenly distributed
  84. to each volume server. Because the 7 more benchmark volumes are randomly distributed
  85. to servers with free slots, it's highly possible some servers have uneven amount of
  86. benchmark volumes. To remedy this, you can use this to grow the benchmark volumes
  87. before starting the benchmark command:
  88. http://localhost:9333/vol/grow?collection=benchmark&count=5
  89. After benchmarking, you can clean up the written data by deleting the benchmark collection
  90. http://localhost:9333/col/delete?collection=benchmark
  91. `,
  92. }
  93. var (
  94. wait sync.WaitGroup
  95. writeStats *stats
  96. readStats *stats
  97. )
  98. func runBenchmark(cmd *Command, args []string) bool {
  99. util.LoadConfiguration("security", false)
  100. b.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  101. fmt.Printf("This is SeaweedFS version %s %s %s\n", util.VERSION, runtime.GOOS, runtime.GOARCH)
  102. if *b.maxCpu < 1 {
  103. *b.maxCpu = runtime.NumCPU()
  104. }
  105. runtime.GOMAXPROCS(*b.maxCpu)
  106. if *b.cpuprofile != "" {
  107. f, err := os.Create(*b.cpuprofile)
  108. if err != nil {
  109. glog.Fatal(err)
  110. }
  111. pprof.StartCPUProfile(f)
  112. defer pprof.StopCPUProfile()
  113. }
  114. b.masterClient = wdclient.NewMasterClient(context.Background(), b.grpcDialOption, "client", strings.Split(*b.masters, ","))
  115. go b.masterClient.KeepConnectedToMaster()
  116. b.masterClient.WaitUntilConnected()
  117. if *b.write {
  118. benchWrite()
  119. }
  120. if *b.read {
  121. benchRead()
  122. }
  123. return true
  124. }
  125. func benchWrite() {
  126. fileIdLineChan := make(chan string)
  127. finishChan := make(chan bool)
  128. writeStats = newStats(*b.concurrency)
  129. idChan := make(chan int)
  130. go writeFileIds(*b.idListFile, fileIdLineChan, finishChan)
  131. for i := 0; i < *b.concurrency; i++ {
  132. wait.Add(1)
  133. go writeFiles(idChan, fileIdLineChan, &writeStats.localStats[i])
  134. }
  135. writeStats.start = time.Now()
  136. writeStats.total = *b.numberOfFiles
  137. go writeStats.checkProgress("Writing Benchmark", finishChan)
  138. for i := 0; i < *b.numberOfFiles; i++ {
  139. idChan <- i
  140. }
  141. close(idChan)
  142. wait.Wait()
  143. writeStats.end = time.Now()
  144. wait.Add(2)
  145. finishChan <- true
  146. finishChan <- true
  147. wait.Wait()
  148. close(finishChan)
  149. writeStats.printStats()
  150. }
  151. func benchRead() {
  152. fileIdLineChan := make(chan string)
  153. finishChan := make(chan bool)
  154. readStats = newStats(*b.concurrency)
  155. go readFileIds(*b.idListFile, fileIdLineChan)
  156. readStats.start = time.Now()
  157. readStats.total = *b.numberOfFiles
  158. go readStats.checkProgress("Randomly Reading Benchmark", finishChan)
  159. for i := 0; i < *b.concurrency; i++ {
  160. wait.Add(1)
  161. go readFiles(fileIdLineChan, &readStats.localStats[i])
  162. }
  163. wait.Wait()
  164. wait.Add(1)
  165. finishChan <- true
  166. wait.Wait()
  167. close(finishChan)
  168. readStats.end = time.Now()
  169. readStats.printStats()
  170. }
  171. type delayedFile struct {
  172. enterTime time.Time
  173. fp *operation.FilePart
  174. }
  175. func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
  176. defer wait.Done()
  177. delayedDeleteChan := make(chan *delayedFile, 100)
  178. var waitForDeletions sync.WaitGroup
  179. for i := 0; i < 7; i++ {
  180. waitForDeletions.Add(1)
  181. go func() {
  182. defer waitForDeletions.Done()
  183. for df := range delayedDeleteChan {
  184. if df.enterTime.After(time.Now()) {
  185. time.Sleep(df.enterTime.Sub(time.Now()))
  186. }
  187. var jwtAuthorization security.EncodedJwt
  188. if isSecure {
  189. jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), df.fp.Fid)
  190. }
  191. if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
  192. s.completed++
  193. } else {
  194. s.failed++
  195. }
  196. }
  197. }()
  198. }
  199. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  200. for id := range idChan {
  201. start := time.Now()
  202. fileSize := int64(*b.fileSize + random.Intn(64))
  203. fp := &operation.FilePart{
  204. Reader: &FakeReader{id: uint64(id), size: fileSize},
  205. FileSize: fileSize,
  206. MimeType: "image/bench", // prevent gzip benchmark content
  207. }
  208. ar := &operation.VolumeAssignRequest{
  209. Count: 1,
  210. Collection: *b.collection,
  211. Replication: *b.replication,
  212. }
  213. if assignResult, err := operation.Assign(b.masterClient.GetMaster(), b.grpcDialOption, ar); err == nil {
  214. fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
  215. if !isSecure && assignResult.Auth != "" {
  216. isSecure = true
  217. }
  218. if _, err := fp.Upload(0, b.masterClient.GetMaster(), assignResult.Auth, b.grpcDialOption); err == nil {
  219. if random.Intn(100) < *b.deletePercentage {
  220. s.total++
  221. delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
  222. } else {
  223. fileIdLineChan <- fp.Fid
  224. }
  225. s.completed++
  226. s.transferred += fileSize
  227. } else {
  228. s.failed++
  229. fmt.Printf("Failed to write with error:%v\n", err)
  230. }
  231. writeStats.addSample(time.Now().Sub(start))
  232. if *cmdBenchmark.IsDebug {
  233. fmt.Printf("writing %d file %s\n", id, fp.Fid)
  234. }
  235. } else {
  236. s.failed++
  237. println("writing file error:", err.Error())
  238. }
  239. }
  240. close(delayedDeleteChan)
  241. waitForDeletions.Wait()
  242. }
  243. func readFiles(fileIdLineChan chan string, s *stat) {
  244. defer wait.Done()
  245. for fid := range fileIdLineChan {
  246. if len(fid) == 0 {
  247. continue
  248. }
  249. if fid[0] == '#' {
  250. continue
  251. }
  252. if *cmdBenchmark.IsDebug {
  253. fmt.Printf("reading file %s\n", fid)
  254. }
  255. start := time.Now()
  256. var bytesRead int
  257. var err error
  258. if *b.readByGrpc {
  259. volumeServer, err := b.masterClient.LookupVolumeServer(fid)
  260. if err != nil {
  261. s.failed++
  262. println("!!!! ", fid, " location not found!!!!!")
  263. continue
  264. }
  265. bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
  266. } else if *b.readByTcp {
  267. volumeServer, err := b.masterClient.LookupVolumeServer(fid)
  268. if err != nil {
  269. s.failed++
  270. println("!!!! ", fid, " location not found!!!!!")
  271. continue
  272. }
  273. bytesRead, err = tcpFileGet(volumeServer, fid)
  274. } else {
  275. url, err := b.masterClient.LookupFileId(fid)
  276. if err != nil {
  277. s.failed++
  278. println("!!!! ", fid, " location not found!!!!!")
  279. continue
  280. }
  281. var bytes []byte
  282. bytes, err = util.Get(url)
  283. bytesRead = len(bytes)
  284. }
  285. if err == nil {
  286. s.completed++
  287. s.transferred += int64(bytesRead)
  288. readStats.addSample(time.Now().Sub(start))
  289. } else {
  290. s.failed++
  291. fmt.Printf("Failed to read %s error:%v\n", fid, err)
  292. }
  293. }
  294. }
  295. func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (bytesRead int, err error) {
  296. err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
  297. fileGetClient, err := client.FileGet(ctx, &volume_server_pb.FileGetRequest{FileId: fid})
  298. if err != nil {
  299. return err
  300. }
  301. for {
  302. resp, respErr := fileGetClient.Recv()
  303. if resp != nil {
  304. bytesRead += len(resp.Data)
  305. }
  306. if respErr != nil {
  307. if respErr == io.EOF {
  308. return nil
  309. }
  310. return respErr
  311. }
  312. }
  313. })
  314. return
  315. }
  316. func tcpFileGet(volumeServer, fid string) (bytesRead int, err error) {
  317. err = operation.WithVolumeServerTcpConnection(volumeServer, func(conn net.Conn) error {
  318. // println("requesting", fid, "...")
  319. if err := util.WriteMessage(conn, &volume_server_pb.TcpRequestHeader{
  320. Get: &volume_server_pb.FileGetRequest{FileId: fid},
  321. }); err != nil {
  322. return err
  323. }
  324. for {
  325. resp := &volume_server_pb.FileGetResponse{}
  326. // println("reading...")
  327. respErr := util.ReadMessage(conn, resp)
  328. if respErr != nil {
  329. if respErr == io.EOF {
  330. return nil
  331. }
  332. // println("err:", respErr.Error())
  333. return respErr
  334. }
  335. // println("resp size", len(resp.Data))
  336. bytesRead += len(resp.Data)
  337. if resp.IsLast {
  338. return nil
  339. }
  340. }
  341. })
  342. return
  343. }
  344. func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
  345. file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  346. if err != nil {
  347. glog.Fatalf("File to create file %s: %s\n", fileName, err)
  348. }
  349. defer file.Close()
  350. for {
  351. select {
  352. case <-finishChan:
  353. wait.Done()
  354. return
  355. case line := <-fileIdLineChan:
  356. file.Write([]byte(line))
  357. file.Write([]byte("\n"))
  358. }
  359. }
  360. }
  361. func readFileIds(fileName string, fileIdLineChan chan string) {
  362. file, err := os.Open(fileName) // For read access.
  363. if err != nil {
  364. glog.Fatalf("File to read file %s: %s\n", fileName, err)
  365. }
  366. defer file.Close()
  367. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  368. r := bufio.NewReader(file)
  369. if *b.sequentialRead {
  370. for {
  371. if line, err := Readln(r); err == nil {
  372. fileIdLineChan <- string(line)
  373. } else {
  374. break
  375. }
  376. }
  377. } else {
  378. lines := make([]string, 0, readStats.total)
  379. for {
  380. if line, err := Readln(r); err == nil {
  381. lines = append(lines, string(line))
  382. } else {
  383. break
  384. }
  385. }
  386. if len(lines) > 0 {
  387. for i := 0; i < readStats.total; i++ {
  388. fileIdLineChan <- lines[random.Intn(len(lines))]
  389. }
  390. }
  391. }
  392. close(fileIdLineChan)
  393. }
  394. const (
  395. benchResolution = 10000 //0.1 microsecond
  396. benchBucket = 1000000000 / benchResolution
  397. )
  398. // An efficient statics collecting and rendering
  399. type stats struct {
  400. data []int
  401. overflow []int
  402. localStats []stat
  403. start time.Time
  404. end time.Time
  405. total int
  406. }
  407. type stat struct {
  408. completed int
  409. failed int
  410. total int
  411. transferred int64
  412. }
  413. var percentages = []int{50, 66, 75, 80, 90, 95, 98, 99, 100}
  414. func newStats(n int) *stats {
  415. return &stats{
  416. data: make([]int, benchResolution),
  417. overflow: make([]int, 0),
  418. localStats: make([]stat, n),
  419. }
  420. }
  421. func (s *stats) addSample(d time.Duration) {
  422. index := int(d / benchBucket)
  423. if index < 0 {
  424. fmt.Printf("This request takes %3.1f seconds, skipping!\n", float64(index)/10000)
  425. } else if index < len(s.data) {
  426. s.data[int(d/benchBucket)]++
  427. } else {
  428. s.overflow = append(s.overflow, index)
  429. }
  430. }
  431. func (s *stats) checkProgress(testName string, finishChan chan bool) {
  432. fmt.Printf("\n------------ %s ----------\n", testName)
  433. ticker := time.Tick(time.Second)
  434. lastCompleted, lastTransferred, lastTime := 0, int64(0), time.Now()
  435. for {
  436. select {
  437. case <-finishChan:
  438. wait.Done()
  439. return
  440. case t := <-ticker:
  441. completed, transferred, taken, total := 0, int64(0), t.Sub(lastTime), s.total
  442. for _, localStat := range s.localStats {
  443. completed += localStat.completed
  444. transferred += localStat.transferred
  445. total += localStat.total
  446. }
  447. fmt.Printf("Completed %d of %d requests, %3.1f%% %3.1f/s %3.1fMB/s\n",
  448. completed, total, float64(completed)*100/float64(total),
  449. float64(completed-lastCompleted)*float64(int64(time.Second))/float64(int64(taken)),
  450. float64(transferred-lastTransferred)*float64(int64(time.Second))/float64(int64(taken))/float64(1024*1024),
  451. )
  452. lastCompleted, lastTransferred, lastTime = completed, transferred, t
  453. }
  454. }
  455. }
  456. func (s *stats) printStats() {
  457. completed, failed, transferred, total := 0, 0, int64(0), s.total
  458. for _, localStat := range s.localStats {
  459. completed += localStat.completed
  460. failed += localStat.failed
  461. transferred += localStat.transferred
  462. total += localStat.total
  463. }
  464. timeTaken := float64(int64(s.end.Sub(s.start))) / 1000000000
  465. fmt.Printf("\nConcurrency Level: %d\n", *b.concurrency)
  466. fmt.Printf("Time taken for tests: %.3f seconds\n", timeTaken)
  467. fmt.Printf("Complete requests: %d\n", completed)
  468. fmt.Printf("Failed requests: %d\n", failed)
  469. fmt.Printf("Total transferred: %d bytes\n", transferred)
  470. fmt.Printf("Requests per second: %.2f [#/sec]\n", float64(completed)/timeTaken)
  471. fmt.Printf("Transfer rate: %.2f [Kbytes/sec]\n", float64(transferred)/1024/timeTaken)
  472. n, sum := 0, 0
  473. min, max := 10000000, 0
  474. for i := 0; i < len(s.data); i++ {
  475. n += s.data[i]
  476. sum += s.data[i] * i
  477. if s.data[i] > 0 {
  478. if min > i {
  479. min = i
  480. }
  481. if max < i {
  482. max = i
  483. }
  484. }
  485. }
  486. n += len(s.overflow)
  487. for i := 0; i < len(s.overflow); i++ {
  488. sum += s.overflow[i]
  489. if min > s.overflow[i] {
  490. min = s.overflow[i]
  491. }
  492. if max < s.overflow[i] {
  493. max = s.overflow[i]
  494. }
  495. }
  496. avg := float64(sum) / float64(n)
  497. varianceSum := 0.0
  498. for i := 0; i < len(s.data); i++ {
  499. if s.data[i] > 0 {
  500. d := float64(i) - avg
  501. varianceSum += d * d * float64(s.data[i])
  502. }
  503. }
  504. for i := 0; i < len(s.overflow); i++ {
  505. d := float64(s.overflow[i]) - avg
  506. varianceSum += d * d
  507. }
  508. std := math.Sqrt(varianceSum / float64(n))
  509. fmt.Printf("\nConnection Times (ms)\n")
  510. fmt.Printf(" min avg max std\n")
  511. fmt.Printf("Total: %2.1f %3.1f %3.1f %3.1f\n", float32(min)/10, float32(avg)/10, float32(max)/10, std/10)
  512. //printing percentiles
  513. fmt.Printf("\nPercentage of the requests served within a certain time (ms)\n")
  514. percentiles := make([]int, len(percentages))
  515. for i := 0; i < len(percentages); i++ {
  516. percentiles[i] = n * percentages[i] / 100
  517. }
  518. percentiles[len(percentiles)-1] = n
  519. percentileIndex := 0
  520. currentSum := 0
  521. for i := 0; i < len(s.data); i++ {
  522. currentSum += s.data[i]
  523. if s.data[i] > 0 && percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  524. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(i)/10.0)
  525. percentileIndex++
  526. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  527. percentileIndex++
  528. }
  529. }
  530. }
  531. sort.Ints(s.overflow)
  532. for i := 0; i < len(s.overflow); i++ {
  533. currentSum++
  534. if percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  535. fmt.Printf(" %3d%% %5.1f ms\n", percentages[percentileIndex], float32(s.overflow[i])/10.0)
  536. percentileIndex++
  537. for percentileIndex < len(percentiles) && currentSum >= percentiles[percentileIndex] {
  538. percentileIndex++
  539. }
  540. }
  541. }
  542. }
  543. // a fake reader to generate content to upload
  544. type FakeReader struct {
  545. id uint64 // an id number
  546. size int64 // max bytes
  547. }
  548. func (l *FakeReader) Read(p []byte) (n int, err error) {
  549. if l.size <= 0 {
  550. return 0, io.EOF
  551. }
  552. if int64(len(p)) > l.size {
  553. n = int(l.size)
  554. } else {
  555. n = len(p)
  556. }
  557. if n >= 8 {
  558. for i := 0; i < 8; i++ {
  559. p[i] = byte(l.id >> uint(i*8))
  560. }
  561. }
  562. l.size -= int64(n)
  563. return
  564. }
  565. func (l *FakeReader) WriteTo(w io.Writer) (n int64, err error) {
  566. size := int(l.size)
  567. bufferSize := len(sharedBytes)
  568. for size > 0 {
  569. tempBuffer := sharedBytes
  570. if size < bufferSize {
  571. tempBuffer = sharedBytes[0:size]
  572. }
  573. count, e := w.Write(tempBuffer)
  574. if e != nil {
  575. return int64(size), e
  576. }
  577. size -= count
  578. }
  579. return l.size, nil
  580. }
  581. func Readln(r *bufio.Reader) ([]byte, error) {
  582. var (
  583. isPrefix = true
  584. err error
  585. line, ln []byte
  586. )
  587. for isPrefix && err == nil {
  588. line, isPrefix, err = r.ReadLine()
  589. ln = append(ln, line...)
  590. }
  591. return ln, err
  592. }