abstract_sql_store_kv.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package abstract_sql
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/base64"
  6. "fmt"
  7. "strings"
  8. "github.com/chrislusf/seaweedfs/weed/filer"
  9. "github.com/chrislusf/seaweedfs/weed/util/log"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. )
  12. func (store *AbstractSqlStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  13. dirStr, dirHash, name := genDirAndName(key)
  14. res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlInsert, dirHash, name, dirStr, value)
  15. if err != nil {
  16. if !strings.Contains(strings.ToLower(err.Error()), "duplicate") {
  17. return fmt.Errorf("kv insert: %s", err)
  18. }
  19. }
  20. // now the insert failed possibly due to duplication constraints
  21. log.Debugf("kv insert falls back to update: %s", err)
  22. res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, value, dirHash, name, dirStr)
  23. if err != nil {
  24. return fmt.Errorf("kv upsert: %s", err)
  25. }
  26. _, err = res.RowsAffected()
  27. if err != nil {
  28. return fmt.Errorf("kv upsert no rows affected: %s", err)
  29. }
  30. return nil
  31. }
  32. func (store *AbstractSqlStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  33. dirStr, dirHash, name := genDirAndName(key)
  34. row := store.getTxOrDB(ctx).QueryRowContext(ctx, store.SqlFind, dirHash, name, dirStr)
  35. err = row.Scan(&value)
  36. if err == sql.ErrNoRows {
  37. return nil, filer.ErrKvNotFound
  38. }
  39. if err != nil {
  40. return nil, fmt.Errorf("kv get: %v", err)
  41. }
  42. return
  43. }
  44. func (store *AbstractSqlStore) KvDelete(ctx context.Context, key []byte) (err error) {
  45. dirStr, dirHash, name := genDirAndName(key)
  46. res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDelete, dirHash, name, dirStr)
  47. if err != nil {
  48. return fmt.Errorf("kv delete: %s", err)
  49. }
  50. _, err = res.RowsAffected()
  51. if err != nil {
  52. return fmt.Errorf("kv delete no rows affected: %s", err)
  53. }
  54. return nil
  55. }
  56. func genDirAndName(key []byte) (dirStr string, dirHash int64, name string) {
  57. for len(key) < 8 {
  58. key = append(key, 0)
  59. }
  60. dirHash = int64(util.BytesToUint64(key[:8]))
  61. dirStr = base64.StdEncoding.EncodeToString(key[:8])
  62. name = base64.StdEncoding.EncodeToString(key[8:])
  63. return
  64. }