volume_tcp_client.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package wdclient
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "github.com/seaweedfs/seaweedfs/weed/wdclient/net2"
  10. "io"
  11. "net"
  12. "time"
  13. )
  14. // VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
  15. type VolumeTcpClient struct {
  16. cp net2.ConnectionPool
  17. }
  18. type VolumeTcpConn struct {
  19. net.Conn
  20. bufWriter *bufio.Writer
  21. bufReader *bufio.Reader
  22. }
  23. func NewVolumeTcpClient() *VolumeTcpClient {
  24. MaxIdleTime := 10 * time.Second
  25. return &VolumeTcpClient{
  26. cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
  27. MaxActiveConnections: 16,
  28. MaxIdleConnections: 1,
  29. MaxIdleTime: &MaxIdleTime,
  30. DialMaxConcurrency: 0,
  31. Dial: func(network string, address string) (net.Conn, error) {
  32. conn, err := net.Dial(network, address)
  33. return &VolumeTcpConn{
  34. conn,
  35. bufio.NewWriter(conn),
  36. bufio.NewReader(conn),
  37. }, err
  38. },
  39. NowFunc: nil,
  40. ReadTimeout: 0,
  41. WriteTimeout: 0,
  42. }),
  43. }
  44. }
  45. func (c *VolumeTcpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
  46. tcpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20000)
  47. if parseErr != nil {
  48. return parseErr
  49. }
  50. c.cp.Register("tcp", tcpAddress)
  51. tcpConn, getErr := c.cp.Get("tcp", tcpAddress)
  52. if getErr != nil {
  53. return fmt.Errorf("get connection to %s: %v", tcpAddress, getErr)
  54. }
  55. conn := tcpConn.RawConn().(*VolumeTcpConn)
  56. defer func() {
  57. if err != nil {
  58. tcpConn.DiscardConnection()
  59. } else {
  60. tcpConn.ReleaseConnection()
  61. }
  62. }()
  63. buf := []byte("+" + fileId + "\n")
  64. _, err = conn.bufWriter.Write([]byte(buf))
  65. if err != nil {
  66. return
  67. }
  68. util.Uint32toBytes(buf[0:4], fileSize)
  69. _, err = conn.bufWriter.Write(buf[0:4])
  70. if err != nil {
  71. return
  72. }
  73. _, err = io.Copy(conn.bufWriter, fileReader)
  74. if err != nil {
  75. return
  76. }
  77. conn.bufWriter.Write([]byte("!\n"))
  78. conn.bufWriter.Flush()
  79. ret, _, err := conn.bufReader.ReadLine()
  80. if err != nil {
  81. glog.V(0).Infof("upload by tcp: %v", err)
  82. return
  83. }
  84. if !bytes.HasPrefix(ret, []byte("+OK")) {
  85. glog.V(0).Infof("upload by tcp: %v", string(ret))
  86. }
  87. return nil
  88. }