filerstore.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package filer2
  2. import (
  3. "context"
  4. "errors"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/stats"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type FilerStore interface {
  11. // GetName gets the name to locate the configuration in filer.toml file
  12. GetName() string
  13. // Initialize initializes the file store
  14. Initialize(configuration util.Configuration) error
  15. InsertEntry(context.Context, *Entry) error
  16. UpdateEntry(context.Context, *Entry) (err error)
  17. // err == filer2.ErrNotFound if not found
  18. FindEntry(context.Context, FullPath) (entry *Entry, err error)
  19. DeleteEntry(context.Context, FullPath) (err error)
  20. ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  21. BeginTransaction(ctx context.Context) (context.Context, error)
  22. CommitTransaction(ctx context.Context) error
  23. RollbackTransaction(ctx context.Context) error
  24. }
  25. var ErrNotFound = errors.New("filer: no entry is found in filer store")
  26. type FilerStoreWrapper struct {
  27. actualStore FilerStore
  28. }
  29. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  30. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  31. return innerStore
  32. }
  33. return &FilerStoreWrapper{
  34. actualStore: store,
  35. }
  36. }
  37. func (fsw *FilerStoreWrapper) GetName() string {
  38. return fsw.actualStore.GetName()
  39. }
  40. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration) error {
  41. return fsw.actualStore.Initialize(configuration)
  42. }
  43. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  44. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "insert").Inc()
  45. start := time.Now()
  46. defer func() {
  47. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  48. }()
  49. filer_pb.BeforeEntrySerialization(entry.Chunks)
  50. return fsw.actualStore.InsertEntry(ctx, entry)
  51. }
  52. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  53. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "update").Inc()
  54. start := time.Now()
  55. defer func() {
  56. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  57. }()
  58. filer_pb.BeforeEntrySerialization(entry.Chunks)
  59. return fsw.actualStore.UpdateEntry(ctx, entry)
  60. }
  61. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp FullPath) (entry *Entry, err error) {
  62. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "find").Inc()
  63. start := time.Now()
  64. defer func() {
  65. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  66. }()
  67. entry, err = fsw.actualStore.FindEntry(ctx, fp)
  68. if err != nil {
  69. return nil, err
  70. }
  71. filer_pb.AfterEntryDeserialization(entry.Chunks)
  72. return
  73. }
  74. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err error) {
  75. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "delete").Inc()
  76. start := time.Now()
  77. defer func() {
  78. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  79. }()
  80. return fsw.actualStore.DeleteEntry(ctx, fp)
  81. }
  82. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  83. stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
  84. start := time.Now()
  85. defer func() {
  86. stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  87. }()
  88. entries, err := fsw.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  89. if err != nil {
  90. return nil, err
  91. }
  92. for _, entry := range entries {
  93. filer_pb.AfterEntryDeserialization(entry.Chunks)
  94. }
  95. return entries, err
  96. }
  97. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  98. return fsw.actualStore.BeginTransaction(ctx)
  99. }
  100. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  101. return fsw.actualStore.CommitTransaction(ctx)
  102. }
  103. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  104. return fsw.actualStore.RollbackTransaction(ctx)
  105. }