s3api_object_handlers_list.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "fmt"
  6. "github.com/aws/aws-sdk-go/service/s3"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  10. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  11. "io"
  12. "net/http"
  13. "net/url"
  14. "strconv"
  15. "strings"
  16. )
  17. type OptionalString struct {
  18. string
  19. set bool
  20. }
  21. func (o OptionalString) MarshalXML(e *xml.Encoder, startElement xml.StartElement) error {
  22. if !o.set {
  23. return nil
  24. }
  25. return e.EncodeElement(o.string, startElement)
  26. }
  27. type ListBucketResultV2 struct {
  28. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
  29. Name string `xml:"Name"`
  30. Prefix string `xml:"Prefix"`
  31. MaxKeys uint16 `xml:"MaxKeys"`
  32. Delimiter string `xml:"Delimiter,omitempty"`
  33. IsTruncated bool `xml:"IsTruncated"`
  34. Contents []ListEntry `xml:"Contents,omitempty"`
  35. CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"`
  36. ContinuationToken OptionalString `xml:"ContinuationToken,omitempty"`
  37. NextContinuationToken string `xml:"NextContinuationToken,omitempty"`
  38. EncodingType string `xml:"EncodingType,omitempty"`
  39. KeyCount int `xml:"KeyCount"`
  40. StartAfter string `xml:"StartAfter,omitempty"`
  41. }
  42. func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
  43. // https://docs.aws.amazon.com/AmazonS3/latest/API/v2-RESTBucketGET.html
  44. // collect parameters
  45. bucket, _ := s3_constants.GetBucketAndObject(r)
  46. glog.V(3).Infof("ListObjectsV2Handler %s", bucket)
  47. originalPrefix, startAfter, delimiter, continuationToken, encodingTypeUrl, fetchOwner, maxKeys := getListObjectsV2Args(r.URL.Query())
  48. if maxKeys < 0 {
  49. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
  50. return
  51. }
  52. marker := continuationToken.string
  53. if !continuationToken.set {
  54. marker = startAfter
  55. }
  56. response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter, encodingTypeUrl, fetchOwner)
  57. if err != nil {
  58. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  59. return
  60. }
  61. if len(response.Contents) == 0 {
  62. if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
  63. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  64. return
  65. }
  66. }
  67. responseV2 := &ListBucketResultV2{
  68. Name: response.Name,
  69. CommonPrefixes: response.CommonPrefixes,
  70. Contents: response.Contents,
  71. ContinuationToken: continuationToken,
  72. Delimiter: response.Delimiter,
  73. IsTruncated: response.IsTruncated,
  74. KeyCount: len(response.Contents) + len(response.CommonPrefixes),
  75. MaxKeys: uint16(response.MaxKeys),
  76. NextContinuationToken: response.NextMarker,
  77. Prefix: response.Prefix,
  78. StartAfter: startAfter,
  79. }
  80. if encodingTypeUrl {
  81. responseV2.EncodingType = s3.EncodingTypeUrl
  82. }
  83. writeSuccessResponseXML(w, r, responseV2)
  84. }
  85. func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
  86. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html
  87. // collect parameters
  88. bucket, _ := s3_constants.GetBucketAndObject(r)
  89. glog.V(3).Infof("ListObjectsV1Handler %s", bucket)
  90. originalPrefix, marker, delimiter, encodingTypeUrl, maxKeys := getListObjectsV1Args(r.URL.Query())
  91. if maxKeys < 0 {
  92. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
  93. return
  94. }
  95. response, err := s3a.listFilerEntries(bucket, originalPrefix, uint16(maxKeys), marker, delimiter, encodingTypeUrl, true)
  96. if err != nil {
  97. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  98. return
  99. }
  100. if len(response.Contents) == 0 {
  101. if exists, existErr := s3a.exists(s3a.option.BucketsPath, bucket, true); existErr == nil && !exists {
  102. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
  103. return
  104. }
  105. }
  106. writeSuccessResponseXML(w, r, response)
  107. }
  108. func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, maxKeys uint16, originalMarker string, delimiter string, encodingTypeUrl bool, fetchOwner bool) (response ListBucketResult, err error) {
  109. // convert full path prefix into directory name and prefix for entry name
  110. requestDir, prefix, marker := normalizePrefixMarker(originalPrefix, originalMarker)
  111. bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, bucket)
  112. reqDir := bucketPrefix[:len(bucketPrefix)-1]
  113. if requestDir != "" {
  114. reqDir = fmt.Sprintf("%s%s", bucketPrefix, requestDir)
  115. }
  116. var contents []ListEntry
  117. var commonPrefixes []PrefixEntry
  118. var doErr error
  119. var nextMarker string
  120. cursor := &ListingCursor{
  121. maxKeys: maxKeys,
  122. prefixEndsOnDelimiter: strings.HasSuffix(originalPrefix, "/") && len(originalMarker) == 0,
  123. }
  124. // check filer
  125. err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  126. for {
  127. empty := true
  128. nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) {
  129. empty = false
  130. dirName, entryName, prefixName := entryUrlEncode(dir, entry.Name, encodingTypeUrl)
  131. if entry.IsDirectory {
  132. if entry.IsDirectoryKeyObject() {
  133. contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, true, false))
  134. cursor.maxKeys--
  135. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
  136. } else if delimiter == "/" { // A response can contain CommonPrefixes only if you specify a delimiter.
  137. commonPrefixes = append(commonPrefixes, PrefixEntry{
  138. Prefix: fmt.Sprintf("%s/%s/", dirName, prefixName)[len(bucketPrefix):],
  139. })
  140. //All of the keys (up to 1,000) rolled up into a common prefix count as a single return when calculating the number of returns.
  141. cursor.maxKeys--
  142. }
  143. } else {
  144. var delimiterFound bool
  145. if delimiter != "" {
  146. // keys that contain the same string between the prefix and the first occurrence of the delimiter are grouped together as a commonPrefix.
  147. // extract the string between the prefix and the delimiter and add it to the commonPrefixes if it's unique.
  148. undelimitedPath := fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):]
  149. // take into account a prefix if supplied while delimiting.
  150. undelimitedPath = strings.TrimPrefix(undelimitedPath, originalPrefix)
  151. delimitedPath := strings.SplitN(undelimitedPath, delimiter, 2)
  152. if len(delimitedPath) == 2 {
  153. // S3 clients expect the delimited prefix to contain the delimiter and prefix.
  154. delimitedPrefix := originalPrefix + delimitedPath[0] + delimiter
  155. for i := range commonPrefixes {
  156. if commonPrefixes[i].Prefix == delimitedPrefix {
  157. delimiterFound = true
  158. break
  159. }
  160. }
  161. if !delimiterFound {
  162. commonPrefixes = append(commonPrefixes, PrefixEntry{
  163. Prefix: delimitedPrefix,
  164. })
  165. cursor.maxKeys--
  166. delimiterFound = true
  167. }
  168. }
  169. }
  170. if !delimiterFound {
  171. contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false))
  172. cursor.maxKeys--
  173. }
  174. }
  175. })
  176. if doErr != nil {
  177. return doErr
  178. }
  179. if cursor.isTruncated {
  180. if requestDir != "" {
  181. nextMarker = requestDir + "/" + nextMarker
  182. }
  183. break
  184. } else if empty || strings.HasSuffix(originalPrefix, "/") {
  185. nextMarker = ""
  186. break
  187. } else {
  188. // start next loop
  189. marker = nextMarker
  190. }
  191. }
  192. response = ListBucketResult{
  193. Name: bucket,
  194. Prefix: originalPrefix,
  195. Marker: originalMarker,
  196. NextMarker: nextMarker,
  197. MaxKeys: int(maxKeys),
  198. Delimiter: delimiter,
  199. IsTruncated: cursor.isTruncated,
  200. Contents: contents,
  201. CommonPrefixes: commonPrefixes,
  202. }
  203. if encodingTypeUrl {
  204. // Todo used for pass test_bucket_listv2_encoding_basic
  205. // sort.Slice(response.CommonPrefixes, func(i, j int) bool { return response.CommonPrefixes[i].Prefix < response.CommonPrefixes[j].Prefix })
  206. response.EncodingType = s3.EncodingTypeUrl
  207. }
  208. return nil
  209. })
  210. return
  211. }
  212. type ListingCursor struct {
  213. maxKeys uint16
  214. isTruncated bool
  215. prefixEndsOnDelimiter bool
  216. }
  217. // the prefix and marker may be in different directories
  218. // normalizePrefixMarker ensures the prefix and marker both starts from the same directory
  219. func normalizePrefixMarker(prefix, marker string) (alignedDir, alignedPrefix, alignedMarker string) {
  220. // alignedDir should not end with "/"
  221. // alignedDir, alignedPrefix, alignedMarker should only have "/" in middle
  222. if len(marker) == 0 {
  223. prefix = strings.Trim(prefix, "/")
  224. } else {
  225. prefix = strings.TrimLeft(prefix, "/")
  226. }
  227. marker = strings.TrimLeft(marker, "/")
  228. if prefix == "" {
  229. return "", "", marker
  230. }
  231. if marker == "" {
  232. alignedDir, alignedPrefix = toDirAndName(prefix)
  233. return
  234. }
  235. if !strings.HasPrefix(marker, prefix) {
  236. // something wrong
  237. return "", prefix, marker
  238. }
  239. if strings.HasPrefix(marker, prefix+"/") {
  240. alignedDir = prefix
  241. alignedPrefix = ""
  242. alignedMarker = marker[len(alignedDir)+1:]
  243. return
  244. }
  245. alignedDir, alignedPrefix = toDirAndName(prefix)
  246. if alignedDir != "" {
  247. alignedMarker = marker[len(alignedDir)+1:]
  248. } else {
  249. alignedMarker = marker
  250. }
  251. return
  252. }
  253. func toDirAndName(dirAndName string) (dir, name string) {
  254. sepIndex := strings.LastIndex(dirAndName, "/")
  255. if sepIndex >= 0 {
  256. dir, name = dirAndName[0:sepIndex], dirAndName[sepIndex+1:]
  257. } else {
  258. name = dirAndName
  259. }
  260. return
  261. }
  262. func toParentAndDescendants(dirAndName string) (dir, name string) {
  263. sepIndex := strings.Index(dirAndName, "/")
  264. if sepIndex >= 0 {
  265. dir, name = dirAndName[0:sepIndex], dirAndName[sepIndex+1:]
  266. } else {
  267. name = dirAndName
  268. }
  269. return
  270. }
  271. func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) {
  272. // invariants
  273. // prefix and marker should be under dir, marker may contain "/"
  274. // maxKeys should be updated for each recursion
  275. // glog.V(4).Infof("doListFilerEntries dir: %s, prefix: %s, marker %s, maxKeys: %d, prefixEndsOnDelimiter: %+v", dir, prefix, marker, cursor.maxKeys, cursor.prefixEndsOnDelimiter)
  276. if prefix == "/" && delimiter == "/" {
  277. return
  278. }
  279. if cursor.maxKeys <= 0 {
  280. return
  281. }
  282. if strings.Contains(marker, "/") {
  283. subDir, subMarker := toParentAndDescendants(marker)
  284. // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker)
  285. subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, eachEntryFn)
  286. if subErr != nil {
  287. err = subErr
  288. return
  289. }
  290. nextMarker = subDir + "/" + subNextMarker
  291. // finished processing this subdirectory
  292. marker = subDir
  293. }
  294. if cursor.isTruncated {
  295. return
  296. }
  297. // now marker is also a direct child of dir
  298. request := &filer_pb.ListEntriesRequest{
  299. Directory: dir,
  300. Prefix: prefix,
  301. Limit: uint32(cursor.maxKeys + 2), // bucket root directory needs to skip additional s3_constants.MultipartUploadsFolder folder
  302. StartFromFileName: marker,
  303. InclusiveStartFrom: inclusiveStartFrom,
  304. }
  305. if cursor.prefixEndsOnDelimiter {
  306. request.Limit = uint32(1)
  307. }
  308. ctx, cancel := context.WithCancel(context.Background())
  309. defer cancel()
  310. stream, listErr := client.ListEntries(ctx, request)
  311. if listErr != nil {
  312. err = fmt.Errorf("list entires %+v: %v", request, listErr)
  313. return
  314. }
  315. for {
  316. resp, recvErr := stream.Recv()
  317. if recvErr != nil {
  318. if recvErr == io.EOF {
  319. break
  320. } else {
  321. err = fmt.Errorf("iterating entires %+v: %v", request, recvErr)
  322. return
  323. }
  324. }
  325. if cursor.maxKeys <= 0 {
  326. cursor.isTruncated = true
  327. continue
  328. }
  329. entry := resp.Entry
  330. nextMarker = entry.Name
  331. if cursor.prefixEndsOnDelimiter {
  332. if entry.Name == prefix && entry.IsDirectory {
  333. if delimiter != "/" {
  334. cursor.prefixEndsOnDelimiter = false
  335. }
  336. } else {
  337. continue
  338. }
  339. }
  340. if entry.IsDirectory {
  341. // glog.V(4).Infof("List Dir Entries %s, file: %s, maxKeys %d", dir, entry.Name, cursor.maxKeys)
  342. if entry.Name == s3_constants.MultipartUploadsFolder { // FIXME no need to apply to all directories. this extra also affects maxKeys
  343. continue
  344. }
  345. if delimiter != "/" || cursor.prefixEndsOnDelimiter {
  346. if cursor.prefixEndsOnDelimiter {
  347. cursor.prefixEndsOnDelimiter = false
  348. if entry.IsDirectoryKeyObject() {
  349. eachEntryFn(dir, entry)
  350. }
  351. } else {
  352. eachEntryFn(dir, entry)
  353. }
  354. subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, eachEntryFn)
  355. if subErr != nil {
  356. err = fmt.Errorf("doListFilerEntries2: %v", subErr)
  357. return
  358. }
  359. // println("doListFilerEntries2 dir", dir+"/"+entry.Name, "subNextMarker", subNextMarker)
  360. nextMarker = entry.Name + "/" + subNextMarker
  361. if cursor.isTruncated {
  362. return
  363. }
  364. // println("doListFilerEntries2 nextMarker", nextMarker)
  365. } else {
  366. var isEmpty bool
  367. if !s3a.option.AllowEmptyFolder && entry.IsOlderDir() {
  368. //if isEmpty, err = s3a.ensureDirectoryAllEmpty(client, dir, entry.Name); err != nil {
  369. // glog.Errorf("check empty folder %s: %v", dir, err)
  370. //}
  371. }
  372. if !isEmpty {
  373. eachEntryFn(dir, entry)
  374. }
  375. }
  376. } else {
  377. eachEntryFn(dir, entry)
  378. // glog.V(4).Infof("List File Entries %s, file: %s, maxKeys %d", dir, entry.Name, cursor.maxKeys)
  379. }
  380. if cursor.prefixEndsOnDelimiter {
  381. cursor.prefixEndsOnDelimiter = false
  382. }
  383. }
  384. return
  385. }
  386. func getListObjectsV2Args(values url.Values) (prefix, startAfter, delimiter string, token OptionalString, encodingTypeUrl bool, fetchOwner bool, maxkeys uint16) {
  387. prefix = values.Get("prefix")
  388. token = OptionalString{set: values.Has("continuation-token"), string: values.Get("continuation-token")}
  389. startAfter = values.Get("start-after")
  390. delimiter = values.Get("delimiter")
  391. encodingTypeUrl = values.Get("encoding-type") == s3.EncodingTypeUrl
  392. if values.Get("max-keys") != "" {
  393. if maxKeys, err := strconv.ParseUint(values.Get("max-keys"), 10, 16); err == nil {
  394. maxkeys = uint16(maxKeys)
  395. }
  396. } else {
  397. maxkeys = maxObjectListSizeLimit
  398. }
  399. fetchOwner = values.Get("fetch-owner") == "true"
  400. return
  401. }
  402. func getListObjectsV1Args(values url.Values) (prefix, marker, delimiter string, encodingTypeUrl bool, maxkeys int16) {
  403. prefix = values.Get("prefix")
  404. marker = values.Get("marker")
  405. delimiter = values.Get("delimiter")
  406. encodingTypeUrl = values.Get("encoding-type") == "url"
  407. if values.Get("max-keys") != "" {
  408. if maxKeys, err := strconv.ParseInt(values.Get("max-keys"), 10, 16); err == nil {
  409. maxkeys = int16(maxKeys)
  410. }
  411. } else {
  412. maxkeys = maxObjectListSizeLimit
  413. }
  414. return
  415. }
  416. func (s3a *S3ApiServer) ensureDirectoryAllEmpty(filerClient filer_pb.SeaweedFilerClient, parentDir, name string) (isEmpty bool, err error) {
  417. // println("+ ensureDirectoryAllEmpty", dir, name)
  418. glog.V(4).Infof("+ isEmpty %s/%s", parentDir, name)
  419. defer glog.V(4).Infof("- isEmpty %s/%s %v", parentDir, name, isEmpty)
  420. var fileCounter int
  421. var subDirs []string
  422. currentDir := parentDir + "/" + name
  423. var startFrom string
  424. var isExhausted bool
  425. var foundEntry bool
  426. for fileCounter == 0 && !isExhausted && err == nil {
  427. err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  428. foundEntry = true
  429. if entry.IsOlderDir() {
  430. subDirs = append(subDirs, entry.Name)
  431. } else {
  432. fileCounter++
  433. }
  434. startFrom = entry.Name
  435. isExhausted = isExhausted || isLast
  436. glog.V(4).Infof(" * %s/%s isLast: %t", currentDir, startFrom, isLast)
  437. return nil
  438. }, startFrom, false, 8)
  439. if !foundEntry {
  440. break
  441. }
  442. }
  443. if err != nil {
  444. return false, err
  445. }
  446. if fileCounter > 0 {
  447. return false, nil
  448. }
  449. for _, subDir := range subDirs {
  450. isSubEmpty, subErr := s3a.ensureDirectoryAllEmpty(filerClient, currentDir, subDir)
  451. if subErr != nil {
  452. return false, subErr
  453. }
  454. if !isSubEmpty {
  455. return false, nil
  456. }
  457. }
  458. glog.V(1).Infof("deleting empty folder %s", currentDir)
  459. if err = doDeleteEntry(filerClient, parentDir, name, true, false); err != nil {
  460. return
  461. }
  462. return true, nil
  463. }