filerstore.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package filer2
  2. import (
  3. "context"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "github.com/chrislusf/seaweedfs/weed/stats"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. )
  9. type FilerStore interface {
  10. // GetName gets the name to locate the configuration in filer.toml file
  11. GetName() string
  12. // Initialize initializes the file store
  13. Initialize(configuration util.Configuration, prefix string) error
  14. InsertEntry(context.Context, *Entry) error
  15. UpdateEntry(context.Context, *Entry) (err error)
  16. // err == filer2.ErrNotFound if not found
  17. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  18. DeleteEntry(context.Context, util.FullPath) (err error)
  19. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  20. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  21. BeginTransaction(ctx context.Context) (context.Context, error)
  22. CommitTransaction(ctx context.Context) error
  23. RollbackTransaction(ctx context.Context) error
  24. Shutdown()
  25. }
  26. type FilerLocalStore interface {
  27. UpdateOffset(filer string, lastTsNs int64) error
  28. ReadOffset(filer string) (lastTsNs int64, err error)
  29. }
  30. type FilerStoreWrapper struct {
  31. ActualStore FilerStore
  32. }
  33. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  34. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  35. return innerStore
  36. }
  37. return &FilerStoreWrapper{
  38. ActualStore: store,
  39. }
  40. }
  41. func (fsw *FilerStoreWrapper) GetName() string {
  42. return fsw.ActualStore.GetName()
  43. }
  44. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  45. return fsw.ActualStore.Initialize(configuration, prefix)
  46. }
  47. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  48. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  49. start := time.Now()
  50. defer func() {
  51. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  52. }()
  53. filer_pb.BeforeEntrySerialization(entry.Chunks)
  54. return fsw.ActualStore.InsertEntry(ctx, entry)
  55. }
  56. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  57. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  58. start := time.Now()
  59. defer func() {
  60. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  61. }()
  62. filer_pb.BeforeEntrySerialization(entry.Chunks)
  63. return fsw.ActualStore.UpdateEntry(ctx, entry)
  64. }
  65. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  66. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  67. start := time.Now()
  68. defer func() {
  69. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  70. }()
  71. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  72. if err != nil {
  73. return nil, err
  74. }
  75. filer_pb.AfterEntryDeserialization(entry.Chunks)
  76. return
  77. }
  78. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  79. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  80. start := time.Now()
  81. defer func() {
  82. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  83. }()
  84. return fsw.ActualStore.DeleteEntry(ctx, fp)
  85. }
  86. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  87. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  88. start := time.Now()
  89. defer func() {
  90. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  91. }()
  92. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  93. }
  94. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  95. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  96. start := time.Now()
  97. defer func() {
  98. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  99. }()
  100. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  101. if err != nil {
  102. return nil, err
  103. }
  104. for _, entry := range entries {
  105. filer_pb.AfterEntryDeserialization(entry.Chunks)
  106. }
  107. return entries, err
  108. }
  109. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  110. return fsw.ActualStore.BeginTransaction(ctx)
  111. }
  112. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  113. return fsw.ActualStore.CommitTransaction(ctx)
  114. }
  115. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  116. return fsw.ActualStore.RollbackTransaction(ctx)
  117. }
  118. func (fsw *FilerStoreWrapper) Shutdown() {
  119. fsw.ActualStore.Shutdown()
  120. }