abstract_sql_store_kv.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/base64"
  6. "fmt"
  7. "github.com/chrislusf/seaweedfs/weed/filer"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "strings"
  11. )
  12. func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  13. db, _, _, err := store.getTxOrDB(ctx, "", false)
  14. if err != nil {
  15. return fmt.Errorf("findDB: %v", err)
  16. }
  17. dirStr, dirHash, name := genDirAndName(key)
  18. res, err := db.ExecContext(ctx, store.GetSqlInsert(DEFAULT_TABLE), dirHash, name, dirStr, value)
  19. if err == nil {
  20. return
  21. }
  22. if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
  23. // return fmt.Errorf("kv insert: %s", err)
  24. // skip this since the error can be in a different language
  25. }
  26. // now the insert failed possibly due to duplication constraints
  27. glog.V(1).Infof("kv insert falls back to update: %s", err)
  28. res, err = db.ExecContext(ctx, store.GetSqlUpdate(DEFAULT_TABLE), value, dirHash, name, dirStr)
  29. if err != nil {
  30. return fmt.Errorf("kv upsert: %s", err)
  31. }
  32. _, err = res.RowsAffected()
  33. if err != nil {
  34. return fmt.Errorf("kv upsert no rows affected: %s", err)
  35. }
  36. return nil
  37. }
  38. func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  39. db, _, _, err := store.getTxOrDB(ctx, "", false)
  40. if err != nil {
  41. return nil, fmt.Errorf("findDB: %v", err)
  42. }
  43. dirStr, dirHash, name := genDirAndName(key)
  44. row := db.QueryRowContext(ctx, store.GetSqlFind(DEFAULT_TABLE), dirHash, name, dirStr)
  45. err = row.Scan(&value)
  46. if err == sql.ErrNoRows {
  47. return nil, filer.ErrKvNotFound
  48. }
  49. if err != nil {
  50. return nil, fmt.Errorf("kv get: %v", err)
  51. }
  52. return
  53. }
  54. func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) {
  55. db, _, _, err := store.getTxOrDB(ctx, "", false)
  56. if err != nil {
  57. return fmt.Errorf("findDB: %v", err)
  58. }
  59. dirStr, dirHash, name := genDirAndName(key)
  60. res, err := db.ExecContext(ctx, store.GetSqlDelete(DEFAULT_TABLE), dirHash, name, dirStr)
  61. if err != nil {
  62. return fmt.Errorf("kv delete: %s", err)
  63. }
  64. _, err = res.RowsAffected()
  65. if err != nil {
  66. return fmt.Errorf("kv delete no rows affected: %s", err)
  67. }
  68. return nil
  69. }
  70. func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
  71. for len(key) < 8 {
  72. key = append(key, 0)
  73. }
  74. dirHash = int64(util.BytesToUint64(key[:8]))
  75. dirStr = base64.StdEncoding.EncodeToString(key[:8])
  76. name = base64.StdEncoding.EncodeToString(key[8:])
  77. return
  78. }