filerstore.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package filer
  2. import (
  3. "context"
  4. "errors"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/stats"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. var (
  12. ErrUnsupportedListDirectoryPrefixed = errors.New("unsupported directory prefix listing")
  13. ErrKvNotImplemented = errors.New("kv not implemented yet")
  14. ErrKvNotFound = errors.New("kv: not found")
  15. )
  16. type FilerStore interface {
  17. // GetName gets the name to locate the configuration in filer.toml file
  18. GetName() string
  19. // Initialize initializes the file store
  20. Initialize(configuration util.Configuration, prefix string) error
  21. InsertEntry(context.Context, *Entry) error
  22. UpdateEntry(context.Context, *Entry) (err error)
  23. // err == filer_pb.ErrNotFound if not found
  24. FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
  25. DeleteEntry(context.Context, util.FullPath) (err error)
  26. DeleteFolderChildren(context.Context, util.FullPath) (err error)
  27. ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
  28. ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error)
  29. BeginTransaction(ctx context.Context) (context.Context, error)
  30. CommitTransaction(ctx context.Context) error
  31. RollbackTransaction(ctx context.Context) error
  32. KvPut(ctx context.Context, key []byte, value []byte) (err error)
  33. KvGet(ctx context.Context, key []byte) (value []byte, err error)
  34. KvDelete(ctx context.Context, key []byte) (err error)
  35. Shutdown()
  36. }
  37. type VirtualFilerStore interface {
  38. FilerStore
  39. DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
  40. }
  41. type FilerStoreWrapper struct {
  42. ActualStore FilerStore
  43. }
  44. func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
  45. if innerStore, ok := store.(*FilerStoreWrapper); ok {
  46. return innerStore
  47. }
  48. return &FilerStoreWrapper{
  49. ActualStore: store,
  50. }
  51. }
  52. func (fsw *FilerStoreWrapper) GetName() string {
  53. return fsw.ActualStore.GetName()
  54. }
  55. func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
  56. return fsw.ActualStore.Initialize(configuration, prefix)
  57. }
  58. func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
  59. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
  60. start := time.Now()
  61. defer func() {
  62. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
  63. }()
  64. filer_pb.BeforeEntrySerialization(entry.Chunks)
  65. if entry.Mime == "application/octet-stream" {
  66. entry.Mime = ""
  67. }
  68. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  69. return err
  70. }
  71. return fsw.ActualStore.InsertEntry(ctx, entry)
  72. }
  73. func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
  74. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
  75. start := time.Now()
  76. defer func() {
  77. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
  78. }()
  79. filer_pb.BeforeEntrySerialization(entry.Chunks)
  80. if entry.Mime == "application/octet-stream" {
  81. entry.Mime = ""
  82. }
  83. if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
  84. return err
  85. }
  86. return fsw.ActualStore.UpdateEntry(ctx, entry)
  87. }
  88. func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
  89. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
  90. start := time.Now()
  91. defer func() {
  92. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
  93. }()
  94. entry, err = fsw.ActualStore.FindEntry(ctx, fp)
  95. if err != nil {
  96. return nil, err
  97. }
  98. fsw.maybeReadHardLink(ctx, entry)
  99. filer_pb.AfterEntryDeserialization(entry.Chunks)
  100. return
  101. }
  102. func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
  103. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
  104. start := time.Now()
  105. defer func() {
  106. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
  107. }()
  108. existingEntry, findErr := fsw.FindEntry(ctx, fp)
  109. if findErr == filer_pb.ErrNotFound {
  110. return nil
  111. }
  112. if len(existingEntry.HardLinkId) != 0 {
  113. // remove hard link
  114. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  115. return err
  116. }
  117. }
  118. return fsw.ActualStore.DeleteEntry(ctx, fp)
  119. }
  120. func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
  121. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
  122. start := time.Now()
  123. defer func() {
  124. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
  125. }()
  126. return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
  127. }
  128. func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
  129. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
  130. start := time.Now()
  131. defer func() {
  132. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
  133. }()
  134. entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  135. if err != nil {
  136. return nil, err
  137. }
  138. for _, entry := range entries {
  139. fsw.maybeReadHardLink(ctx, entry)
  140. filer_pb.AfterEntryDeserialization(entry.Chunks)
  141. }
  142. return entries, err
  143. }
  144. func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) ([]*Entry, error) {
  145. stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Inc()
  146. start := time.Now()
  147. defer func() {
  148. stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "prefixList").Observe(time.Since(start).Seconds())
  149. }()
  150. entries, err := fsw.ActualStore.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  151. if err == ErrUnsupportedListDirectoryPrefixed {
  152. entries, err = fsw.prefixFilterEntries(ctx, dirPath, startFileName, includeStartFile, limit, prefix)
  153. }
  154. if err != nil {
  155. return nil, err
  156. }
  157. for _, entry := range entries {
  158. fsw.maybeReadHardLink(ctx, entry)
  159. filer_pb.AfterEntryDeserialization(entry.Chunks)
  160. }
  161. return entries, nil
  162. }
  163. func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int, prefix string) (entries []*Entry, err error) {
  164. entries, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
  165. if err != nil {
  166. return nil, err
  167. }
  168. if prefix == "" {
  169. return
  170. }
  171. count := 0
  172. var lastFileName string
  173. notPrefixed := entries
  174. entries = nil
  175. for count < limit && len(notPrefixed) > 0 {
  176. for _, entry := range notPrefixed {
  177. lastFileName = entry.Name()
  178. if strings.HasPrefix(entry.Name(), prefix) {
  179. count++
  180. entries = append(entries, entry)
  181. if count >= limit {
  182. break
  183. }
  184. }
  185. }
  186. if count < limit {
  187. notPrefixed, err = fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit)
  188. if err != nil {
  189. return
  190. }
  191. }
  192. }
  193. return
  194. }
  195. func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
  196. return fsw.ActualStore.BeginTransaction(ctx)
  197. }
  198. func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
  199. return fsw.ActualStore.CommitTransaction(ctx)
  200. }
  201. func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
  202. return fsw.ActualStore.RollbackTransaction(ctx)
  203. }
  204. func (fsw *FilerStoreWrapper) Shutdown() {
  205. fsw.ActualStore.Shutdown()
  206. }
  207. func (fsw *FilerStoreWrapper) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  208. return fsw.ActualStore.KvPut(ctx, key, value)
  209. }
  210. func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  211. return fsw.ActualStore.KvGet(ctx, key)
  212. }
  213. func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
  214. return fsw.ActualStore.KvDelete(ctx, key)
  215. }