cassandra_store_kv.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package cassandra
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "github.com/gocql/gocql"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. )
  9. func (store *CassandraStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  10. dir, name := genDirAndName(key)
  11. if err := store.session.Query(
  12. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  13. dir, name, value, 0).Exec(); err != nil {
  14. return fmt.Errorf("kv insert: %s", err)
  15. }
  16. return nil
  17. }
  18. func (store *CassandraStore) KvGet(ctx context.Context, key []byte) (data []byte, err error) {
  19. dir, name := genDirAndName(key)
  20. if err := store.session.Query(
  21. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  22. dir, name).Scan(&data); err != nil {
  23. if err != gocql.ErrNotFound {
  24. return nil, filer.ErrKvNotFound
  25. }
  26. }
  27. if len(data) == 0 {
  28. return nil, filer.ErrKvNotFound
  29. }
  30. return data, nil
  31. }
  32. func (store *CassandraStore) KvDelete(ctx context.Context, key []byte) (err error) {
  33. dir, name := genDirAndName(key)
  34. if err := store.session.Query(
  35. "DELETE FROM filemeta WHERE directory=? AND name=?",
  36. dir, name).Exec(); err != nil {
  37. return fmt.Errorf("kv delete: %v", err)
  38. }
  39. return nil
  40. }
  41. func genDirAndName(key []byte) (dir string, name string) {
  42. for len(key) < 8 {
  43. key = append(key, 0)
  44. }
  45. dir = base64.StdEncoding.EncodeToString(key[:8])
  46. name = base64.StdEncoding.EncodeToString(key[8:])
  47. return
  48. }