filer_server_handlers_write.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package weed_server
  2. import (
  3. "context"
  4. "net/http"
  5. "os"
  6. "strings"
  7. "time"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/security"
  12. "github.com/chrislusf/seaweedfs/weed/stats"
  13. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. var (
  17. OS_UID = uint32(os.Getuid())
  18. OS_GID = uint32(os.Getgid())
  19. )
  20. type FilerPostResult struct {
  21. Name string `json:"name,omitempty"`
  22. Size int64 `json:"size,omitempty"`
  23. Error string `json:"error,omitempty"`
  24. Fid string `json:"fid,omitempty"`
  25. Url string `json:"url,omitempty"`
  26. }
  27. func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
  28. stats.FilerRequestCounter.WithLabelValues("assign").Inc()
  29. start := time.Now()
  30. defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
  31. ar := &operation.VolumeAssignRequest{
  32. Count: 1,
  33. Replication: replication,
  34. Collection: collection,
  35. Ttl: ttlString,
  36. DataCenter: dataCenter,
  37. }
  38. var altRequest *operation.VolumeAssignRequest
  39. if dataCenter != "" {
  40. altRequest = &operation.VolumeAssignRequest{
  41. Count: 1,
  42. Replication: replication,
  43. Collection: collection,
  44. Ttl: ttlString,
  45. DataCenter: "",
  46. }
  47. }
  48. assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
  49. if ae != nil {
  50. glog.Errorf("failing to assign a file id: %v", ae)
  51. err = ae
  52. return
  53. }
  54. fileId = assignResult.Fid
  55. urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
  56. if fsync {
  57. urlLocation += "?fsync=true"
  58. }
  59. auth = assignResult.Auth
  60. return
  61. }
  62. func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
  63. ctx := context.Background()
  64. query := r.URL.Query()
  65. collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
  66. dataCenter := query.Get("dataCenter")
  67. if dataCenter == "" {
  68. dataCenter = fs.option.DataCenter
  69. }
  70. ttlString := r.URL.Query().Get("ttl")
  71. // read ttl in seconds
  72. ttl, err := needle.ReadTTL(ttlString)
  73. ttlSeconds := int32(0)
  74. if err == nil {
  75. ttlSeconds = int32(ttl.Minutes()) * 60
  76. }
  77. fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
  78. }
  79. // curl -X DELETE http://localhost:8888/path/to
  80. // curl -X DELETE http://localhost:8888/path/to?recursive=true
  81. // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
  82. // curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
  83. func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
  84. isRecursive := r.FormValue("recursive") == "true"
  85. if !isRecursive && fs.option.recursiveDelete {
  86. if r.FormValue("recursive") != "false" {
  87. isRecursive = true
  88. }
  89. }
  90. ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
  91. skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
  92. objectPath := r.URL.Path
  93. if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
  94. objectPath = objectPath[0 : len(objectPath)-1]
  95. }
  96. err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false)
  97. if err != nil {
  98. glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
  99. httpStatus := http.StatusInternalServerError
  100. if err == filer_pb.ErrNotFound {
  101. httpStatus = http.StatusNotFound
  102. }
  103. writeJsonError(w, r, httpStatus, err)
  104. return
  105. }
  106. w.WriteHeader(http.StatusNoContent)
  107. }
  108. func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
  109. // default
  110. collection = fs.option.Collection
  111. replication = fs.option.DefaultReplication
  112. // get default collection settings
  113. if qCollection != "" {
  114. collection = qCollection
  115. }
  116. if qReplication != "" {
  117. replication = qReplication
  118. }
  119. // required by buckets folder
  120. if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
  121. bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
  122. t := strings.Index(bucketAndObjectKey, "/")
  123. if t < 0 {
  124. collection = bucketAndObjectKey
  125. }
  126. if t > 0 {
  127. collection = bucketAndObjectKey[:t]
  128. }
  129. replication, fsync = fs.filer.ReadBucketOption(collection)
  130. }
  131. return
  132. }