volume_tcp_file.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package weed_server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "strings"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. func (vs *VolumeServer) HandleTcpConnection(conn net.Conn) error {
  16. defer conn.Close()
  17. for {
  18. // println("handle tcp conn", conn.RemoteAddr())
  19. tcpMessage := &volume_server_pb.TcpRequestHeader{}
  20. if err := util.ReadMessage(conn, tcpMessage); err != nil {
  21. if err == io.EOF {
  22. return nil
  23. }
  24. return fmt.Errorf("read message: %v", err)
  25. }
  26. if tcpMessage.Get != nil {
  27. vs.handleFileGet(conn, tcpMessage.Get)
  28. }
  29. // println("processed", tcpMessage.Get.FileId)
  30. }
  31. }
  32. func (vs *VolumeServer) handleFileGet(conn net.Conn, req *volume_server_pb.FileGetRequest) error {
  33. headResponse := &volume_server_pb.FileGetResponse{}
  34. n := new(needle.Needle)
  35. commaIndex := strings.LastIndex(req.FileId, ",")
  36. vid := req.FileId[:commaIndex]
  37. fid := req.FileId[commaIndex+1:]
  38. volumeId, err := needle.NewVolumeId(vid)
  39. if err != nil {
  40. headResponse.ErrorCode = http.StatusBadRequest
  41. return util.WriteMessage(conn, headResponse)
  42. }
  43. err = n.ParsePath(fid)
  44. if err != nil {
  45. headResponse.ErrorCode = http.StatusBadRequest
  46. return util.WriteMessage(conn, headResponse)
  47. }
  48. hasVolume := vs.store.HasVolume(volumeId)
  49. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  50. if !hasVolume && !hasEcVolume {
  51. headResponse.ErrorCode = http.StatusMovedPermanently
  52. return util.WriteMessage(conn, headResponse)
  53. }
  54. cookie := n.Cookie
  55. var count int
  56. if hasVolume {
  57. count, err = vs.store.ReadVolumeNeedle(volumeId, n)
  58. } else if hasEcVolume {
  59. count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
  60. }
  61. if err != nil || count < 0 {
  62. headResponse.ErrorCode = http.StatusNotFound
  63. return util.WriteMessage(conn, headResponse)
  64. }
  65. if n.Cookie != cookie {
  66. headResponse.ErrorCode = http.StatusNotFound
  67. return util.WriteMessage(conn, headResponse)
  68. }
  69. if n.LastModified != 0 {
  70. headResponse.LastModified = n.LastModified
  71. }
  72. headResponse.Etag = n.Etag()
  73. if n.HasPairs() {
  74. pairMap := make(map[string]string)
  75. err = json.Unmarshal(n.Pairs, &pairMap)
  76. if err != nil {
  77. glog.V(0).Infoln("Unmarshal pairs error:", err)
  78. }
  79. headResponse.Headers = pairMap
  80. }
  81. /*
  82. // skip this, no redirection
  83. if vs.tryHandleChunkedFile(n, filename, w, r) {
  84. return
  85. }
  86. */
  87. if n.NameSize > 0 {
  88. headResponse.Filename = string(n.Name)
  89. }
  90. mtype := ""
  91. if n.MimeSize > 0 {
  92. mt := string(n.Mime)
  93. if !strings.HasPrefix(mt, "application/octet-stream") {
  94. mtype = mt
  95. }
  96. }
  97. headResponse.ContentType = mtype
  98. headResponse.IsGzipped = n.IsGzipped()
  99. if n.IsGzipped() && req.AcceptGzip {
  100. if n.Data, err = util.UnGzipData(n.Data); err != nil {
  101. glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
  102. }
  103. }
  104. headResponse.ContentLength = uint32(len(n.Data))
  105. bytesToRead := len(n.Data)
  106. bytesRead := 0
  107. t := headResponse
  108. for bytesRead < bytesToRead {
  109. stopIndex := bytesRead + BufferSizeLimit
  110. if stopIndex > bytesToRead {
  111. stopIndex = bytesToRead
  112. }
  113. if t == nil {
  114. t = &volume_server_pb.FileGetResponse{}
  115. }
  116. t.Data = n.Data[bytesRead:stopIndex]
  117. t.IsLast = stopIndex == bytesToRead
  118. err = util.WriteMessage(conn, t)
  119. t = nil
  120. if err != nil {
  121. return err
  122. }
  123. bytesRead = stopIndex
  124. }
  125. return nil
  126. }