filer_buckets.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package filer
  2. import (
  3. "context"
  4. "math"
  5. "sync"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. )
  9. type BucketName string
  10. type BucketOption struct {
  11. Name BucketName
  12. Replication string
  13. fsync bool
  14. }
  15. type FilerBuckets struct {
  16. dirBucketsPath string
  17. buckets map[BucketName]*BucketOption
  18. sync.RWMutex
  19. }
  20. func (f *Filer) LoadBuckets() {
  21. f.buckets = &FilerBuckets{
  22. buckets: make(map[BucketName]*BucketOption),
  23. }
  24. limit := int64(math.MaxInt32)
  25. entries, _, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "", "")
  26. if err != nil {
  27. glog.V(1).Infof("no buckets found: %v", err)
  28. return
  29. }
  30. shouldFsyncMap := make(map[string]bool)
  31. for _, bucket := range f.FsyncBuckets {
  32. shouldFsyncMap[bucket] = true
  33. }
  34. glog.V(1).Infof("buckets found: %d", len(entries))
  35. f.buckets.Lock()
  36. for _, entry := range entries {
  37. _, shouldFsnyc := shouldFsyncMap[entry.Name()]
  38. f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
  39. Name: BucketName(entry.Name()),
  40. Replication: entry.Replication,
  41. fsync: shouldFsnyc,
  42. }
  43. }
  44. f.buckets.Unlock()
  45. }
  46. func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {
  47. f.buckets.RLock()
  48. defer f.buckets.RUnlock()
  49. option, found := f.buckets.buckets[BucketName(buketName)]
  50. if !found {
  51. return "", false
  52. }
  53. return option.Replication, option.fsync
  54. }
  55. func (f *Filer) isBucket(entry *Entry) bool {
  56. if !entry.IsDirectory() {
  57. return false
  58. }
  59. parent, dirName := entry.FullPath.DirAndName()
  60. if parent != f.DirBucketsPath {
  61. return false
  62. }
  63. f.buckets.RLock()
  64. defer f.buckets.RUnlock()
  65. _, found := f.buckets.buckets[BucketName(dirName)]
  66. return found
  67. }
  68. func (f *Filer) maybeAddBucket(entry *Entry) {
  69. if !entry.IsDirectory() {
  70. return
  71. }
  72. parent, dirName := entry.FullPath.DirAndName()
  73. if parent != f.DirBucketsPath {
  74. return
  75. }
  76. f.addBucket(dirName, &BucketOption{
  77. Name: BucketName(dirName),
  78. Replication: entry.Replication,
  79. })
  80. }
  81. func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {
  82. f.buckets.Lock()
  83. defer f.buckets.Unlock()
  84. f.buckets.buckets[BucketName(buketName)] = bucketOption
  85. }
  86. func (f *Filer) deleteBucket(buketName string) {
  87. f.buckets.Lock()
  88. defer f.buckets.Unlock()
  89. delete(f.buckets.buckets, BucketName(buketName))
  90. }