tikv_store.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. // +build !386
  2. // +build !arm
  3. package tikv
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/md5"
  8. "fmt"
  9. "io"
  10. "github.com/chrislusf/seaweedfs/weed/filer2"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. weed_util "github.com/chrislusf/seaweedfs/weed/util"
  13. "github.com/pingcap/tidb/kv"
  14. "github.com/pingcap/tidb/store/tikv"
  15. )
  16. func init() {
  17. filer2.Stores = append(filer2.Stores, &TikvStore{})
  18. }
  19. type TikvStore struct {
  20. store kv.Storage
  21. }
  22. func (store *TikvStore) GetName() string {
  23. return "tikv"
  24. }
  25. func (store *TikvStore) Initialize(configuration weed_util.Configuration, prefix string) (err error) {
  26. pdAddr := configuration.GetString(prefix + "pdAddress")
  27. return store.initialize(pdAddr)
  28. }
  29. func (store *TikvStore) initialize(pdAddr string) (err error) {
  30. glog.Infof("filer store tikv pd address: %s", pdAddr)
  31. driver := tikv.Driver{}
  32. store.store, err = driver.Open(fmt.Sprintf("tikv://%s", pdAddr))
  33. if err != nil {
  34. return fmt.Errorf("open tikv %s : %v", pdAddr, err)
  35. }
  36. return
  37. }
  38. func (store *TikvStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  39. tx, err := store.store.Begin()
  40. if err != nil {
  41. return ctx, err
  42. }
  43. return context.WithValue(ctx, "tx", tx), nil
  44. }
  45. func (store *TikvStore) CommitTransaction(ctx context.Context) error {
  46. tx, ok := ctx.Value("tx").(kv.Transaction)
  47. if ok {
  48. return tx.Commit(ctx)
  49. }
  50. return nil
  51. }
  52. func (store *TikvStore) RollbackTransaction(ctx context.Context) error {
  53. tx, ok := ctx.Value("tx").(kv.Transaction)
  54. if ok {
  55. return tx.Rollback()
  56. }
  57. return nil
  58. }
  59. func (store *TikvStore) getTx(ctx context.Context) kv.Transaction {
  60. if tx, ok := ctx.Value("tx").(kv.Transaction); ok {
  61. return tx
  62. }
  63. return nil
  64. }
  65. func (store *TikvStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  66. dir, name := entry.DirAndName()
  67. key := genKey(dir, name)
  68. value, err := entry.EncodeAttributesAndChunks()
  69. if err != nil {
  70. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  71. }
  72. err = store.getTx(ctx).Set(key, value)
  73. if err != nil {
  74. return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
  75. }
  76. // println("saved", entry.FullPath, "chunks", len(entry.Chunks))
  77. return nil
  78. }
  79. func (store *TikvStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  80. return store.InsertEntry(ctx, entry)
  81. }
  82. func (store *TikvStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  83. dir, name := fullpath.DirAndName()
  84. key := genKey(dir, name)
  85. data, err := store.getTx(ctx).Get(ctx, key)
  86. if err == kv.ErrNotExist {
  87. return nil, filer2.ErrNotFound
  88. }
  89. if err != nil {
  90. return nil, fmt.Errorf("get %s : %v", entry.FullPath, err)
  91. }
  92. entry = &filer2.Entry{
  93. FullPath: fullpath,
  94. }
  95. err = entry.DecodeAttributesAndChunks(data)
  96. if err != nil {
  97. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  98. }
  99. // println("read", entry.FullPath, "chunks", len(entry.Chunks), "data", len(data), string(data))
  100. return entry, nil
  101. }
  102. func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
  103. dir, name := fullpath.DirAndName()
  104. key := genKey(dir, name)
  105. err = store.getTx(ctx).Delete(key)
  106. if err != nil {
  107. return fmt.Errorf("delete %s : %v", fullpath, err)
  108. }
  109. return nil
  110. }
  111. func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
  112. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  113. tx := store.getTx(ctx)
  114. iter, err := tx.Iter(directoryPrefix, nil)
  115. if err != nil {
  116. return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
  117. }
  118. defer iter.Close()
  119. for iter.Valid() {
  120. key := iter.Key()
  121. if !bytes.HasPrefix(key, directoryPrefix) {
  122. break
  123. }
  124. fileName := getNameFromKey(key)
  125. if fileName == "" {
  126. iter.Next()
  127. continue
  128. }
  129. if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
  130. return fmt.Errorf("delete %s : %v", fullpath, err)
  131. }
  132. iter.Next()
  133. }
  134. return nil
  135. }
  136. func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
  137. limit int) (entries []*filer2.Entry, err error) {
  138. directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
  139. lastFileStart := genDirectoryKeyPrefix(fullpath, startFileName)
  140. iter, err := store.getTx(ctx).Iter(lastFileStart, nil)
  141. if err != nil {
  142. return nil, fmt.Errorf("list %s: %v", fullpath, err)
  143. }
  144. defer iter.Close()
  145. for iter.Valid() {
  146. key := iter.Key()
  147. if !bytes.HasPrefix(key, directoryPrefix) {
  148. break
  149. }
  150. fileName := getNameFromKey(key)
  151. if fileName == "" {
  152. iter.Next()
  153. continue
  154. }
  155. if fileName == startFileName && !inclusive {
  156. iter.Next()
  157. continue
  158. }
  159. limit--
  160. if limit < 0 {
  161. break
  162. }
  163. entry := &filer2.Entry{
  164. FullPath: filer2.NewFullPath(string(fullpath), fileName),
  165. }
  166. // println("list", entry.FullPath, "chunks", len(entry.Chunks))
  167. if decodeErr := entry.DecodeAttributesAndChunks(iter.Value()); decodeErr != nil {
  168. err = decodeErr
  169. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  170. break
  171. }
  172. entries = append(entries, entry)
  173. iter.Next()
  174. }
  175. return entries, err
  176. }
  177. func genKey(dirPath, fileName string) (key []byte) {
  178. key = hashToBytes(dirPath)
  179. key = append(key, []byte(fileName)...)
  180. return key
  181. }
  182. func genDirectoryKeyPrefix(fullpath filer2.FullPath, startFileName string) (keyPrefix []byte) {
  183. keyPrefix = hashToBytes(string(fullpath))
  184. if len(startFileName) > 0 {
  185. keyPrefix = append(keyPrefix, []byte(startFileName)...)
  186. }
  187. return keyPrefix
  188. }
  189. func getNameFromKey(key []byte) string {
  190. return string(key[md5.Size:])
  191. }
  192. // hash directory
  193. func hashToBytes(dir string) []byte {
  194. h := md5.New()
  195. io.WriteString(h, dir)
  196. b := h.Sum(nil)
  197. return b
  198. }