etcd_store.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/filer2"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  10. "go.etcd.io/etcd/clientv3"
  11. )
  12. const (
  13. DIR_FILE_SEPARATOR = byte(0x00)
  14. )
  15. func init() {
  16. filer2.Stores = append(filer2.Stores, &EtcdStore{})
  17. }
  18. type EtcdStore struct {
  19. client *clientv3.Client
  20. }
  21. func (store *EtcdStore) GetName() string {
  22. return "etcd"
  23. }
  24. func (store *EtcdStore) Initialize(configuration weed_util.Configuration) (err error) {
  25. servers := configuration.GetString("servers")
  26. if servers == "" {
  27. servers = "localhost:2379"
  28. }
  29. timeout := configuration.GetString("timeout")
  30. if timeout == "" {
  31. timeout = "3s"
  32. }
  33. return store.initialize(servers, timeout)
  34. }
  35. func (store *EtcdStore) initialize(servers string, timeout string) (err error) {
  36. glog.Infof("filer store etcd: %s", servers)
  37. to, err := time.ParseDuration(timeout)
  38. if err != nil {
  39. return fmt.Errorf("parse timeout %s: %s", timeout, err)
  40. }
  41. store.client, err = clientv3.New(clientv3.Config{
  42. Endpoints: strings.Split(servers, ","),
  43. DialTimeout: to,
  44. })
  45. if err != nil {
  46. return fmt.Errorf("connect to etcd %s: %s", servers, err)
  47. }
  48. return
  49. }
  50. func (store *EtcdStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  51. return ctx, nil
  52. }
  53. func (store *EtcdStore) CommitTransaction(ctx context.Context) error {
  54. return nil
  55. }
  56. func (store *EtcdStore) RollbackTransaction(ctx context.Context) error {
  57. return nil
  58. }
  59. func (store *EtcdStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  60. key := genKey(entry.DirAndName())
  61. value, err := entry.EncodeAttributesAndChunks()
  62. if err != nil {
  63. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  64. }
  65. if _, err := store.client.Put(ctx, string(key), string(value)); err != nil {
  66. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  67. }
  68. return nil
  69. }
  70. func (store *EtcdStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  71. return store.InsertEntry(ctx, entry)
  72. }
  73. func (store *EtcdStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  74. key := genKey(fullpath.DirAndName())
  75. resp, err := store.client.Get(ctx, string(key))
  76. if err != nil {
  77. return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
  78. }
  79. if len(resp.Kvs) == 0 {
  80. return nil, filer2.ErrNotFound
  81. }
  82. entry = &filer2.Entry{
  83. FullPath: fullpath,
  84. }
  85. err = entry.DecodeAttributesAndChunks(resp.Kvs[0].Value)
  86. if err != nil {
  87. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  88. }
  89. return entry, nil
  90. }
  91. func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
  92. key := genKey(fullpath.DirAndName())
  93. if _, err := store.client.Delete(ctx, string(key)); err != nil {
  94. return fmt.Errorf("delete %s : %v", fullpath, err)
  95. }
  96. return nil
  97. }
  98. func (store *EtcdStore) ListDirectoryEntries(
  99. ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
  100. ) (entries []*filer2.Entry, err error) {
  101. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  102. resp, err := store.client.Get(ctx, string(directoryPrefix),
  103. clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
  104. if err != nil {
  105. return nil, fmt.Errorf("list %s : %v", fullpath, err)
  106. }
  107. for _, kv := range resp.Kvs {
  108. fileName := getNameFromKey(kv.Key)
  109. if fileName == "" {
  110. continue
  111. }
  112. if fileName == startFileName && !inclusive {
  113. continue
  114. }
  115. limit--
  116. if limit < 0 {
  117. break
  118. }
  119. entry := &filer2.Entry{
  120. FullPath: filer2.NewFullPath(string(fullpath), fileName),
  121. }
  122. if decodeErr := entry.DecodeAttributesAndChunks(kv.Value); decodeErr != nil {
  123. err = decodeErr
  124. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  125. break
  126. }
  127. entries = append(entries, entry)
  128. }
  129. return entries, err
  130. }
  131. func genKey(dirPath, fileName string) (key []byte) {
  132. key = []byte(dirPath)
  133. key = append(key, DIR_FILE_SEPARATOR)
  134. key = append(key, []byte(fileName)...)
  135. return key
  136. }
  137. func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
  138. keyPrefix = []byte(string(fullpath))
  139. keyPrefix = append(keyPrefix, DIR_FILE_SEPARATOR)
  140. if len(startFileName) > 0 {
  141. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  142. }
  143. return keyPrefix
  144. }
  145. func getNameFromKey(key []byte) string {
  146. sepIndex := len(key) - 1
  147. for sepIndex >= 0 && key[sepIndex] != DIR_FILE_SEPARATOR {
  148. sepIndex--
  149. }
  150. return string(key[sepIndex+1:])
  151. }