123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package weed_server
- import (
- "context"
- "encoding/json"
- "fmt"
- "io"
- "net"
- "net/http"
- "strings"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
- )
- func (vs *VolumeServer) HandleTcpConnection(conn net.Conn) error {
- defer conn.Close()
- for {
- // println("handle tcp conn", conn.RemoteAddr())
- tcpMessage := &volume_server_pb.TcpRequestHeader{}
- if err := util.ReadMessage(conn, tcpMessage); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("read message: %v", err)
- }
- if tcpMessage.Get != nil {
- vs.handleFileGet(conn, tcpMessage.Get)
- }
- // println("processed", tcpMessage.Get.FileId)
- }
- }
- func (vs *VolumeServer) handleFileGet(conn net.Conn, req *volume_server_pb.FileGetRequest) error {
- headResponse := &volume_server_pb.FileGetResponse{}
- n := new(needle.Needle)
- commaIndex := strings.LastIndex(req.FileId, ",")
- vid := req.FileId[:commaIndex]
- fid := req.FileId[commaIndex+1:]
- volumeId, err := needle.NewVolumeId(vid)
- if err != nil {
- headResponse.ErrorCode = http.StatusBadRequest
- return util.WriteMessage(conn, headResponse)
- }
- err = n.ParsePath(fid)
- if err != nil {
- headResponse.ErrorCode = http.StatusBadRequest
- return util.WriteMessage(conn, headResponse)
- }
- hasVolume := vs.store.HasVolume(volumeId)
- _, hasEcVolume := vs.store.FindEcVolume(volumeId)
- if !hasVolume && !hasEcVolume {
- headResponse.ErrorCode = http.StatusMovedPermanently
- return util.WriteMessage(conn, headResponse)
- }
- cookie := n.Cookie
- var count int
- if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n)
- } else if hasEcVolume {
- count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
- }
- if err != nil || count < 0 {
- headResponse.ErrorCode = http.StatusNotFound
- return util.WriteMessage(conn, headResponse)
- }
- if n.Cookie != cookie {
- headResponse.ErrorCode = http.StatusNotFound
- return util.WriteMessage(conn, headResponse)
- }
- if n.LastModified != 0 {
- headResponse.LastModified = n.LastModified
- }
- headResponse.Etag = n.Etag()
- if n.HasPairs() {
- pairMap := make(map[string]string)
- err = json.Unmarshal(n.Pairs, &pairMap)
- if err != nil {
- glog.V(0).Infoln("Unmarshal pairs error:", err)
- }
- headResponse.Headers = pairMap
- }
- /*
- // skip this, no redirection
- if vs.tryHandleChunkedFile(n, filename, w, r) {
- return
- }
- */
- if n.NameSize > 0 {
- headResponse.Filename = string(n.Name)
- }
- mtype := ""
- if n.MimeSize > 0 {
- mt := string(n.Mime)
- if !strings.HasPrefix(mt, "application/octet-stream") {
- mtype = mt
- }
- }
- headResponse.ContentType = mtype
- headResponse.IsGzipped = n.IsGzipped()
- if n.IsGzipped() && req.AcceptGzip {
- if n.Data, err = util.UnGzipData(n.Data); err != nil {
- glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
- }
- }
- headResponse.ContentLength = uint32(len(n.Data))
- bytesToRead := len(n.Data)
- bytesRead := 0
- t := headResponse
- for bytesRead < bytesToRead {
- stopIndex := bytesRead + BufferSizeLimit
- if stopIndex > bytesToRead {
- stopIndex = bytesToRead
- }
- if t == nil {
- t = &volume_server_pb.FileGetResponse{}
- }
- t.Data = n.Data[bytesRead:stopIndex]
- t.IsLast = stopIndex == bytesToRead
- err = util.WriteMessage(conn, t)
- t = nil
- if err != nil {
- return err
- }
- bytesRead = stopIndex
- }
- return nil
- }
|