hbase_store.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package hbase
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/tsuna/gohbase"
  11. "github.com/tsuna/gohbase/hrpc"
  12. "io"
  13. )
  14. func init() {
  15. filer.Stores = append(filer.Stores, &HbaseStore{})
  16. }
  17. type HbaseStore struct {
  18. Client gohbase.Client
  19. table []byte
  20. cfKv string
  21. cfMetaDir string
  22. column string
  23. }
  24. func (store *HbaseStore) GetName() string {
  25. return "hbase"
  26. }
  27. func (store *HbaseStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  28. return store.initialize(
  29. configuration.GetString(prefix+"zkquorum"),
  30. configuration.GetString(prefix+"table"),
  31. )
  32. }
  33. func (store *HbaseStore) initialize(zkquorum, table string) (err error) {
  34. store.Client = gohbase.NewClient(zkquorum)
  35. store.table = []byte(table)
  36. store.cfKv = "kv"
  37. store.cfMetaDir = "meta"
  38. store.column = "a"
  39. // check table exists
  40. key := "whatever"
  41. headers := map[string][]string{store.cfMetaDir: nil}
  42. get, err := hrpc.NewGet(context.Background(), store.table, []byte(key), hrpc.Families(headers))
  43. if err != nil {
  44. return fmt.Errorf("NewGet returned an error: %v", err)
  45. }
  46. _, err = store.Client.Get(get)
  47. if err != gohbase.TableNotFound {
  48. return nil
  49. }
  50. // create table
  51. adminClient := gohbase.NewAdminClient(zkquorum)
  52. cFamilies := []string{store.cfKv, store.cfMetaDir}
  53. cf := make(map[string]map[string]string, len(cFamilies))
  54. for _, f := range cFamilies {
  55. cf[f] = nil
  56. }
  57. ct := hrpc.NewCreateTable(context.Background(), []byte(table), cf)
  58. if err := adminClient.CreateTable(ct); err != nil {
  59. return err
  60. }
  61. return nil
  62. }
  63. func (store *HbaseStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
  64. value, err := entry.EncodeAttributesAndChunks()
  65. if err != nil {
  66. return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
  67. }
  68. if len(entry.Chunks) > filer.CountEntryChunksForGzip {
  69. value = util.MaybeGzipData(value)
  70. }
  71. return store.doPut(ctx, store.cfMetaDir, []byte(entry.FullPath), value, entry.TtlSec)
  72. }
  73. func (store *HbaseStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  74. return store.InsertEntry(ctx, entry)
  75. }
  76. func (store *HbaseStore) FindEntry(ctx context.Context, path util.FullPath) (entry *filer.Entry, err error) {
  77. value, err := store.doGet(ctx, store.cfMetaDir, []byte(path))
  78. if err != nil {
  79. if err == filer.ErrKvNotFound {
  80. return nil, filer_pb.ErrNotFound
  81. }
  82. return nil, err
  83. }
  84. entry = &filer.Entry{
  85. FullPath: path,
  86. }
  87. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value))
  88. if err != nil {
  89. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  90. }
  91. return entry, nil
  92. }
  93. func (store *HbaseStore) DeleteEntry(ctx context.Context, path util.FullPath) (err error) {
  94. return store.doDelete(ctx, store.cfMetaDir, []byte(path))
  95. }
  96. func (store *HbaseStore) DeleteFolderChildren(ctx context.Context, path util.FullPath) (err error) {
  97. family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
  98. expectedPrefix := []byte(path.Child(""))
  99. scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
  100. if err != nil {
  101. return err
  102. }
  103. scanner := store.Client.Scan(scan)
  104. defer scanner.Close()
  105. for {
  106. res, err := scanner.Next()
  107. if err != nil {
  108. break
  109. }
  110. if len(res.Cells) == 0 {
  111. continue
  112. }
  113. cell := res.Cells[0]
  114. if !bytes.HasPrefix(cell.Row, expectedPrefix) {
  115. break
  116. }
  117. fullpath := util.FullPath(cell.Row)
  118. dir, _ := fullpath.DirAndName()
  119. if dir != string(path) {
  120. continue
  121. }
  122. err = store.doDelete(ctx, store.cfMetaDir, cell.Row)
  123. if err != nil {
  124. break
  125. }
  126. }
  127. return
  128. }
  129. func (store *HbaseStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (string, error) {
  130. return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
  131. }
  132. func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  133. family := map[string][]string{store.cfMetaDir: {COLUMN_NAME}}
  134. expectedPrefix := []byte(dirPath.Child(prefix))
  135. scan, err := hrpc.NewScanRange(ctx, store.table, expectedPrefix, nil, hrpc.Families(family))
  136. if err != nil {
  137. return lastFileName, err
  138. }
  139. scanner := store.Client.Scan(scan)
  140. defer scanner.Close()
  141. for {
  142. res, err := scanner.Next()
  143. if err == io.EOF {
  144. break
  145. }
  146. if err != nil {
  147. return lastFileName, err
  148. }
  149. if len(res.Cells) == 0 {
  150. continue
  151. }
  152. cell := res.Cells[0]
  153. if !bytes.HasPrefix(cell.Row, expectedPrefix) {
  154. break
  155. }
  156. fullpath := util.FullPath(cell.Row)
  157. dir, fileName := fullpath.DirAndName()
  158. if dir != string(dirPath) {
  159. continue
  160. }
  161. value := cell.Value
  162. if fileName == startFileName && !includeStartFile {
  163. continue
  164. }
  165. limit--
  166. if limit < 0 {
  167. break
  168. }
  169. lastFileName = fileName
  170. entry := &filer.Entry{
  171. FullPath: fullpath,
  172. }
  173. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(value)); decodeErr != nil {
  174. err = decodeErr
  175. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  176. break
  177. }
  178. if !eachEntryFunc(entry) {
  179. break
  180. }
  181. }
  182. return lastFileName, nil
  183. }
  184. func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  185. return ctx, nil
  186. }
  187. func (store *HbaseStore) CommitTransaction(ctx context.Context) error {
  188. return nil
  189. }
  190. func (store *HbaseStore) RollbackTransaction(ctx context.Context) error {
  191. return nil
  192. }
  193. func (store *HbaseStore) Shutdown() {
  194. store.Client.Close()
  195. }