volume_grpc_file.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package weed_server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "net/http"
  6. "strings"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func (vs *VolumeServer) FileGet(req *volume_server_pb.FileGetRequest, stream volume_server_pb.VolumeServer_FileGetServer) error {
  13. headResponse := &volume_server_pb.FileGetResponse{}
  14. n := new(needle.Needle)
  15. commaIndex := strings.LastIndex(req.FileId, ",")
  16. vid := req.FileId[:commaIndex]
  17. fid := req.FileId[commaIndex+1:]
  18. volumeId, err := needle.NewVolumeId(vid)
  19. if err != nil {
  20. headResponse.ErrorCode = http.StatusBadRequest
  21. return stream.Send(headResponse)
  22. }
  23. err = n.ParsePath(fid)
  24. if err != nil {
  25. headResponse.ErrorCode = http.StatusBadRequest
  26. return stream.Send(headResponse)
  27. }
  28. hasVolume := vs.store.HasVolume(volumeId)
  29. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  30. if !hasVolume && !hasEcVolume {
  31. headResponse.ErrorCode = http.StatusMovedPermanently
  32. return stream.Send(headResponse)
  33. }
  34. cookie := n.Cookie
  35. var count int
  36. if hasVolume {
  37. count, err = vs.store.ReadVolumeNeedle(volumeId, n)
  38. } else if hasEcVolume {
  39. count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
  40. }
  41. if err != nil || count < 0 {
  42. headResponse.ErrorCode = http.StatusNotFound
  43. return stream.Send(headResponse)
  44. }
  45. if n.Cookie != cookie {
  46. headResponse.ErrorCode = http.StatusNotFound
  47. return stream.Send(headResponse)
  48. }
  49. if n.LastModified != 0 {
  50. headResponse.LastModified = n.LastModified
  51. }
  52. headResponse.Etag = n.Etag()
  53. if n.HasPairs() {
  54. pairMap := make(map[string]string)
  55. err = json.Unmarshal(n.Pairs, &pairMap)
  56. if err != nil {
  57. glog.V(0).Infoln("Unmarshal pairs error:", err)
  58. }
  59. headResponse.Headers = pairMap
  60. }
  61. /*
  62. // skip this, no redirection
  63. if vs.tryHandleChunkedFile(n, filename, w, r) {
  64. return
  65. }
  66. */
  67. if n.NameSize > 0 {
  68. headResponse.Filename = string(n.Name)
  69. }
  70. mtype := ""
  71. if n.MimeSize > 0 {
  72. mt := string(n.Mime)
  73. if !strings.HasPrefix(mt, "application/octet-stream") {
  74. mtype = mt
  75. }
  76. }
  77. headResponse.ContentType = mtype
  78. headResponse.IsGzipped = n.IsGzipped()
  79. if n.IsGzipped() && req.AcceptGzip {
  80. if n.Data, err = util.UnGzipData(n.Data); err != nil {
  81. glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
  82. }
  83. }
  84. headResponse.ContentLength = uint32(len(n.Data))
  85. bytesToRead := len(n.Data)
  86. bytesRead := 0
  87. t := headResponse
  88. for bytesRead < bytesToRead {
  89. stopIndex := bytesRead + BufferSizeLimit
  90. if stopIndex > bytesToRead {
  91. stopIndex = bytesToRead
  92. }
  93. if t == nil {
  94. t = &volume_server_pb.FileGetResponse{}
  95. }
  96. t.Data = n.Data[bytesRead:stopIndex]
  97. err = stream.Send(t)
  98. t = nil
  99. if err != nil {
  100. return err
  101. }
  102. bytesRead = stopIndex
  103. }
  104. return nil
  105. }