abstract_sql_store.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "strings"
  11. "sync"
  12. )
  13. type SqlGenerator interface {
  14. GetSqlInsert(bucket string) string
  15. GetSqlUpdate(bucket string) string
  16. GetSqlFind(bucket string) string
  17. GetSqlDelete(bucket string) string
  18. GetSqlDeleteFolderChildren(bucket string) string
  19. GetSqlListExclusive(bucket string) string
  20. GetSqlListInclusive(bucket string) string
  21. GetSqlCreateTable(bucket string) string
  22. GetSqlDropTable(bucket string) string
  23. }
  24. type AbstractSqlStore struct {
  25. SqlGenerator
  26. DB *sql.DB
  27. SupportBucketTable bool
  28. dbs map[string]bool
  29. dbsLock sync.Mutex
  30. }
  31. const (
  32. DEFAULT_TABLE = "filemeta"
  33. )
  34. type TxOrDB interface {
  35. ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
  36. QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
  37. QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
  38. }
  39. func (store *AbstractSqlStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  40. tx, err := store.DB.BeginTx(ctx, &sql.TxOptions{
  41. Isolation: sql.LevelReadCommitted,
  42. ReadOnly: false,
  43. })
  44. if err != nil {
  45. return ctx, err
  46. }
  47. return context.WithValue(ctx, "tx", tx), nil
  48. }
  49. func (store *AbstractSqlStore) CommitTransaction(ctx context.Context) error {
  50. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  51. return tx.Commit()
  52. }
  53. return nil
  54. }
  55. func (store *AbstractSqlStore) RollbackTransaction(ctx context.Context) error {
  56. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  57. return tx.Rollback()
  58. }
  59. return nil
  60. }
  61. func (store *AbstractSqlStore) getTxOrDB(ctx context.Context, fullpath util.FullPath, isForChildren bool) (txOrDB TxOrDB, bucket string, shortPath util.FullPath, err error) {
  62. shortPath = fullpath
  63. bucket = DEFAULT_TABLE
  64. if tx, ok := ctx.Value("tx").(*sql.Tx); ok {
  65. txOrDB = tx
  66. } else {
  67. txOrDB = store.DB
  68. }
  69. if !store.SupportBucketTable {
  70. return
  71. }
  72. if !strings.HasPrefix(string(fullpath), "/buckets/") {
  73. return
  74. }
  75. // detect bucket
  76. bucketAndObjectKey := string(fullpath)[len("/buckets/"):]
  77. t := strings.Index(bucketAndObjectKey, "/")
  78. if t < 0 && !isForChildren {
  79. return
  80. }
  81. bucket = bucketAndObjectKey
  82. shortPath = "/"
  83. if t > 0 {
  84. bucket = bucketAndObjectKey[:t]
  85. shortPath = util.FullPath(bucketAndObjectKey[t:])
  86. }
  87. if isValidBucket(bucket) {
  88. store.dbsLock.Lock()
  89. defer store.dbsLock.Unlock()
  90. if store.dbs == nil {
  91. store.dbs = make(map[string]bool)
  92. }
  93. if _, found := store.dbs[bucket]; !found {
  94. if err = store.CreateTable(ctx, bucket); err != nil {
  95. store.dbs[bucket] = true
  96. }
  97. }
  98. }
  99. return
  100. }
  101. func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  102. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  103. if err != nil {
  104. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  105. }
  106. dir, name := shortPath.DirAndName()
  107. meta, err := entry.EncodeAttributesAndChunks()
  108. if err != nil {
  109. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  110. }
  111. if len(entry.Chunks) > 50 {
  112. meta = util.MaybeGzipData(meta)
  113. }
  114. res, err := db.ExecContext(ctx, store.GetSqlInsert(bucket), util.HashStringToLong(dir), name, dir, meta)
  115. if err == nil {
  116. return
  117. }
  118. if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
  119. // return fmt.Errorf("insert: %s", err)
  120. // skip this since the error can be in a different language
  121. }
  122. // now the insert failed possibly due to duplication constraints
  123. glog.V(1).Infof("insert %s falls back to update: %v", entry.FullPath, err)
  124. res, err = db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  125. if err != nil {
  126. return fmt.Errorf("upsert %s: %s", entry.FullPath, err)
  127. }
  128. _, err = res.RowsAffected()
  129. if err != nil {
  130. return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err)
  131. }
  132. return nil
  133. }
  134. func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  135. db, bucket, shortPath, err := store.getTxOrDB(ctx, entry.FullPath, false)
  136. if err != nil {
  137. return fmt.Errorf("findDB %s : %v", entry.FullPath, err)
  138. }
  139. dir, name := shortPath.DirAndName()
  140. meta, err := entry.EncodeAttributesAndChunks()
  141. if err != nil {
  142. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  143. }
  144. res, err := db.ExecContext(ctx, store.GetSqlUpdate(bucket), meta, util.HashStringToLong(dir), name, dir)
  145. if err != nil {
  146. return fmt.Errorf("update %s: %s", entry.FullPath, err)
  147. }
  148. _, err = res.RowsAffected()
  149. if err != nil {
  150. return fmt.Errorf("update %s but no rows affected: %s", entry.FullPath, err)
  151. }
  152. return nil
  153. }
  154. func (store *AbstractSqlStore) FindEntry(ctx context.Context, fullpath util.FullPath) (*filer.Entry, error) {
  155. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  156. if err != nil {
  157. return nil, fmt.Errorf("findDB %s : %v", fullpath, err)
  158. }
  159. dir, name := shortPath.DirAndName()
  160. row := db.QueryRowContext(ctx, store.GetSqlFind(bucket), util.HashStringToLong(dir), name, dir)
  161. var data []byte
  162. if err := row.Scan(&data); err != nil {
  163. if err == sql.ErrNoRows {
  164. return nil, filer_pb.ErrNotFound
  165. }
  166. return nil, fmt.Errorf("find %s: %v", fullpath, err)
  167. }
  168. entry := &filer.Entry{
  169. FullPath: fullpath,
  170. }
  171. if err := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  172. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  173. }
  174. return entry, nil
  175. }
  176. func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  177. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, false)
  178. if err != nil {
  179. return fmt.Errorf("findDB %s : %v", fullpath, err)
  180. }
  181. dir, name := shortPath.DirAndName()
  182. res, err := db.ExecContext(ctx, store.GetSqlDelete(bucket), util.HashStringToLong(dir), name, dir)
  183. if err != nil {
  184. return fmt.Errorf("delete %s: %s", fullpath, err)
  185. }
  186. _, err = res.RowsAffected()
  187. if err != nil {
  188. return fmt.Errorf("delete %s but no rows affected: %s", fullpath, err)
  189. }
  190. return nil
  191. }
  192. func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  193. db, bucket, shortPath, err := store.getTxOrDB(ctx, fullpath, true)
  194. if err != nil {
  195. return fmt.Errorf("findDB %s : %v", fullpath, err)
  196. }
  197. if isValidBucket(bucket) && shortPath == "/" {
  198. if err = store.deleteTable(ctx, bucket); err == nil {
  199. store.dbsLock.Lock()
  200. delete(store.dbs, bucket)
  201. store.dbsLock.Unlock()
  202. return nil
  203. } else {
  204. return err
  205. }
  206. }
  207. res, err := db.ExecContext(ctx, store.GetSqlDeleteFolderChildren(bucket), util.HashStringToLong(string(shortPath)), fullpath)
  208. if err != nil {
  209. return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
  210. }
  211. _, err = res.RowsAffected()
  212. if err != nil {
  213. return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
  214. }
  215. return nil
  216. }
  217. func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  218. db, bucket, shortPath, err := store.getTxOrDB(ctx, dirPath, true)
  219. if err != nil {
  220. return lastFileName, fmt.Errorf("findDB %s : %v", dirPath, err)
  221. }
  222. sqlText := store.GetSqlListExclusive(bucket)
  223. if includeStartFile {
  224. sqlText = store.GetSqlListInclusive(bucket)
  225. }
  226. rows, err := db.QueryContext(ctx, sqlText, util.HashStringToLong(string(shortPath)), startFileName, string(shortPath), prefix+"%", limit+1)
  227. if err != nil {
  228. return lastFileName, fmt.Errorf("list %s : %v", dirPath, err)
  229. }
  230. defer rows.Close()
  231. for rows.Next() {
  232. var name string
  233. var data []byte
  234. if err = rows.Scan(&name, &data); err != nil {
  235. glog.V(0).Infof("scan %s : %v", dirPath, err)
  236. return lastFileName, fmt.Errorf("scan %s: %v", dirPath, err)
  237. }
  238. lastFileName = name
  239. entry := &filer.Entry{
  240. FullPath: util.NewFullPath(string(dirPath), name),
  241. }
  242. if err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); err != nil {
  243. glog.V(0).Infof("scan decode %s : %v", entry.FullPath, err)
  244. return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
  245. }
  246. if !eachEntryFunc(entry) {
  247. break
  248. }
  249. }
  250. return lastFileName, nil
  251. }
  252. func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  253. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
  254. }
  255. func (store *AbstractSqlStore) Shutdown() {
  256. store.DB.Close()
  257. }
  258. func isValidBucket(bucket string) bool {
  259. return bucket != DEFAULT_TABLE && bucket != ""
  260. }
  261. func (store *AbstractSqlStore) CreateTable(ctx context.Context, bucket string) error {
  262. if !store.SupportBucketTable {
  263. return nil
  264. }
  265. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlCreateTable(bucket))
  266. return err
  267. }
  268. func (store *AbstractSqlStore) deleteTable(ctx context.Context, bucket string) error {
  269. if !store.SupportBucketTable {
  270. return nil
  271. }
  272. _, err := store.DB.ExecContext(ctx, store.SqlGenerator.GetSqlDropTable(bucket))
  273. return err
  274. }