volume_udp_client.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package wdclient
  2. import (
  3. "bufio"
  4. "github.com/chrislusf/seaweedfs/weed/pb"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "github.com/chrislusf/seaweedfs/weed/wdclient/penet"
  7. "io"
  8. "net"
  9. "time"
  10. )
  11. // VolumeUdpClient put/get/delete file chunks directly on volume servers without replication
  12. type VolumeUdpClient struct {
  13. Conn net.Conn
  14. bufWriter *bufio.Writer
  15. bufReader *bufio.Reader
  16. }
  17. type VolumeUdpConn struct {
  18. Conn net.Conn
  19. bufWriter *bufio.Writer
  20. bufReader *bufio.Reader
  21. }
  22. func NewVolumeUdpClient() *VolumeUdpClient {
  23. return &VolumeUdpClient{
  24. }
  25. }
  26. func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
  27. udpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20001)
  28. if parseErr != nil {
  29. return parseErr
  30. }
  31. if c.Conn == nil {
  32. c.Conn, err = penet.DialTimeout("", udpAddress, 500*time.Millisecond)
  33. if err != nil {
  34. return err
  35. }
  36. c.bufWriter = bufio.NewWriter(c.Conn)
  37. }
  38. buf := []byte("+" + fileId + "\n")
  39. _, err = c.bufWriter.Write([]byte(buf))
  40. if err != nil {
  41. return
  42. }
  43. util.Uint32toBytes(buf[0:4], fileSize)
  44. _, err = c.bufWriter.Write(buf[0:4])
  45. if err != nil {
  46. return
  47. }
  48. _, err = io.Copy(c.bufWriter, fileReader)
  49. if err != nil {
  50. return
  51. }
  52. c.bufWriter.Flush()
  53. return nil
  54. }