mongodb_store_kv.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package mongodb
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/util/log"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/mongo"
  9. )
  10. func (store *MongodbStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  11. dir, name := genDirAndName(key)
  12. c := store.connect.Database(store.database).Collection(store.collectionName)
  13. _, err = c.InsertOne(ctx, Model{
  14. Directory: dir,
  15. Name: name,
  16. Meta: value,
  17. })
  18. if err != nil {
  19. return fmt.Errorf("kv put: %v", err)
  20. }
  21. return nil
  22. }
  23. func (store *MongodbStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  24. dir, name := genDirAndName(key)
  25. var data Model
  26. var where = bson.M{"directory": dir, "name": name}
  27. err = store.connect.Database(store.database).Collection(store.collectionName).FindOne(ctx, where).Decode(&data)
  28. if err != mongo.ErrNoDocuments && err != nil {
  29. log.Errorf("kv get: %v", err)
  30. return nil, filer.ErrKvNotFound
  31. }
  32. if len(data.Meta) == 0 {
  33. return nil, filer.ErrKvNotFound
  34. }
  35. return data.Meta, nil
  36. }
  37. func (store *MongodbStore) KvDelete(ctx context.Context, key []byte) (err error) {
  38. dir, name := genDirAndName(key)
  39. where := bson.M{"directory": dir, "name": name}
  40. _, err = store.connect.Database(store.database).Collection(store.collectionName).DeleteOne(ctx, where)
  41. if err != nil {
  42. return fmt.Errorf("kv delete: %v", err)
  43. }
  44. return nil
  45. }
  46. func genDirAndName(key []byte) (dir string, name string) {
  47. for len(key) < 8 {
  48. key = append(key, 0)
  49. }
  50. dir = string(key[:8])
  51. name = string(key[8:])
  52. return
  53. }