elastic_store_kv.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. //go:build elastic
  2. // +build elastic
  3. package elastic
  4. import (
  5. "context"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. jsoniter "github.com/json-iterator/go"
  9. elastic "github.com/olivere/elastic/v7"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. )
  12. func (store *ElasticStore) KvDelete(ctx context.Context, key []byte) (err error) {
  13. deleteResult, err := store.client.Delete().
  14. Index(indexKV).
  15. Type(indexType).
  16. Id(string(key)).
  17. Do(ctx)
  18. if err == nil {
  19. if deleteResult.Result == "deleted" || deleteResult.Result == "not_found" {
  20. return nil
  21. }
  22. }
  23. glog.Errorf("delete key(id:%s) %v.", string(key), err)
  24. return fmt.Errorf("delete key %v.", err)
  25. }
  26. func (store *ElasticStore) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
  27. searchResult, err := store.client.Get().
  28. Index(indexKV).
  29. Type(indexType).
  30. Id(string(key)).
  31. Do(ctx)
  32. if elastic.IsNotFound(err) {
  33. return value, filer.ErrKvNotFound
  34. }
  35. if searchResult != nil && searchResult.Found {
  36. esEntry := &ESKVEntry{}
  37. if err := jsoniter.Unmarshal(searchResult.Source, esEntry); err == nil {
  38. return esEntry.Value, nil
  39. }
  40. }
  41. glog.Errorf("find key(%s),%v.", string(key), err)
  42. return value, filer.ErrKvNotFound
  43. }
  44. func (store *ElasticStore) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
  45. esEntry := &ESKVEntry{value}
  46. val, err := jsoniter.Marshal(esEntry)
  47. if err != nil {
  48. glog.Errorf("insert key(%s) %v.", string(key), err)
  49. return fmt.Errorf("insert key %v.", err)
  50. }
  51. _, err = store.client.Index().
  52. Index(indexKV).
  53. Type(indexType).
  54. Id(string(key)).
  55. BodyJson(string(val)).
  56. Do(ctx)
  57. if err != nil {
  58. return fmt.Errorf("kv put: %v", err)
  59. }
  60. return nil
  61. }