abstract_sql_store.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer2"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. type AbstractSqlStore struct {
  11. DB *sql.DB
  12. SqlInsert string
  13. SqlUpdate string
  14. SqlFind string
  15. SqlDelete string
  16. SqlDeleteFolderChildren string
  17. SqlListExclusive string
  18. SqlListInclusive string
  19. }
  20. type TxOrDB interface {
  21. ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  22. QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
  23. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
  24. }
  25. func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  26. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  27. Isolation: sql.LevelReadCommitted,
  28. ReadOnly: false,
  29. })
  30. if err != nil {
  31. return ctx, err
  32. }
  33. return context.WithValue(ctx, "tx", tx), nil
  34. }
  35. func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
  36. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  37. return tx.Commit()
  38. }
  39. return nil
  40. }
  41. func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
  42. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  43. return tx.Rollback()
  44. }
  45. return nil
  46. }
  47. func (store *AbstractSqlStore) getTxOrDB(ctx context.Context) TxOrDB {
  48. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  49. return tx
  50. }
  51. return store.DB
  52. }
  53. func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  54. dir, name := entry.FullPath.DirAndName()
  55. meta, err := entry.EncodeAttributesAndChunks()
  56. if err != nil {
  57. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  58. }
  59. res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, util.HashStringToLong(dir), name, dir, meta)
  60. if err != nil {
  61. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  62. }
  63. _, err = res.RowsAffected()
  64. if err != nil {
  65. return fmt.Errorf("insert %s but no rows affected: %s", entry.FullPath, err)
  66. }
  67. return nil
  68. }
  69. func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  70. dir, name := entry.FullPath.DirAndName()
  71. meta, err := entry.EncodeAttributesAndChunks()
  72. if err != nil {
  73. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  74. }
  75. res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir)
  76. if err != nil {
  77. return fmt.Errorf("update %s: %s", entry.FullPath, err)
  78. }
  79. _, err = res.RowsAffected()
  80. if err != nil {
  81. return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
  82. }
  83. return nil
  84. }
  85. func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (*filer2.Entry, error) {
  86. dir, name := fullpath.DirAndName()
  87. row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, util.HashStringToLong(dir), name, dir)
  88. var data []byte
  89. if err := row.Scan(&data); err != nil {
  90. return nil, filer2.ErrNotFound
  91. }
  92. entry := &filer2.Entry{
  93. FullPath: fullpath,
  94. }
  95. if err := entry.DecodeAttributesAndChunks(data); err != nil {
  96. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  97. }
  98. return entry, nil
  99. }
  100. func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
  101. dir, name := fullpath.DirAndName()
  102. res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, util.HashStringToLong(dir), name, dir)
  103. if err != nil {
  104. return fmt.Errorf("delete %s: %s", fullpath, err)
  105. }
  106. _, err = res.RowsAffected()
  107. if err != nil {
  108. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  109. }
  110. return nil
  111. }
  112. func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
  113. res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, util.HashStringToLong(string(fullpath)), fullpath)
  114. if err != nil {
  115. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  116. }
  117. _, err = res.RowsAffected()
  118. if err != nil {
  119. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  120. }
  121. return nil
  122. }
  123. func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
  124. sqlText := store.SqlListExclusive
  125. if inclusive {
  126. sqlText = store.SqlListInclusive
  127. }
  128. rows, err := store.getTxOrDB(ctx).QueryContext(ctx, sqlText, util.HashStringToLong(string(fullpath)), startFileName, string(fullpath), limit)
  129. if err != nil {
  130. return nil, fmt.Errorf("list %s : %v", fullpath, err)
  131. }
  132. defer rows.Close()
  133. for rows.Next() {
  134. var name string
  135. var data []byte
  136. if err = rows.Scan(&name, &data); err != nil {
  137. glog.V(0).Infof("scan %s : %v", fullpath, err)
  138. return nil, fmt.Errorf("scan %s: %v", fullpath, err)
  139. }
  140. entry := &filer2.Entry{
  141. FullPath: filer2.NewFullPath(string(fullpath), name),
  142. }
  143. if err = entry.DecodeAttributesAndChunks(data); err != nil {
  144. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  145. return nil, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  146. }
  147. entries = append(entries, entry)
  148. }
  149. return entries, nil
  150. }