cassandra_store.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package cassandra
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/gocql/gocql"
  9. )
  10. func init() {
  11. filer2.Stores = append(filer2.Stores, &CassandraStore{})
  12. }
  13. type CassandraStore struct {
  14. cluster *gocql.ClusterConfig
  15. session *gocql.Session
  16. }
  17. func (store *CassandraStore) GetName() string {
  18. return "cassandra"
  19. }
  20. func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  21. return store.initialize(
  22. configuration.GetString(prefix+"keyspace"),
  23. configuration.GetStringSlice(prefix+"hosts"),
  24. )
  25. }
  26. func (store *CassandraStore) initialize(keyspace string, hosts []string) (err error) {
  27. store.cluster = gocql.NewCluster(hosts...)
  28. store.cluster.Keyspace = keyspace
  29. store.cluster.Consistency = gocql.LocalQuorum
  30. store.session, err = store.cluster.CreateSession()
  31. if err != nil {
  32. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  33. }
  34. return
  35. }
  36. func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  37. return ctx, nil
  38. }
  39. func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
  40. return nil
  41. }
  42. func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
  43. return nil
  44. }
  45. func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  46. dir, name := entry.FullPath.DirAndName()
  47. meta, err := entry.EncodeAttributesAndChunks()
  48. if err != nil {
  49. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  50. }
  51. if err := store.session.Query(
  52. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  53. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  54. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  55. }
  56. return nil
  57. }
  58. func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
  59. return store.InsertEntry(ctx, entry)
  60. }
  61. func (store *CassandraStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
  62. dir, name := fullpath.DirAndName()
  63. var data []byte
  64. if err := store.session.Query(
  65. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  66. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  67. if err != gocql.ErrNotFound {
  68. return nil, filer2.ErrNotFound
  69. }
  70. }
  71. if len(data) == 0 {
  72. return nil, filer2.ErrNotFound
  73. }
  74. entry = &filer2.Entry{
  75. FullPath: fullpath,
  76. }
  77. err = entry.DecodeAttributesAndChunks(data)
  78. if err != nil {
  79. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  80. }
  81. return entry, nil
  82. }
  83. func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) error {
  84. dir, name := fullpath.DirAndName()
  85. if err := store.session.Query(
  86. "DELETE FROM filemeta WHERE directory=? AND name=?",
  87. dir, name).Exec(); err != nil {
  88. return fmt.Errorf("delete %s : %v", fullpath, err)
  89. }
  90. return nil
  91. }
  92. func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
  93. if err := store.session.Query(
  94. "DELETE FROM filemeta WHERE directory=?",
  95. fullpath).Exec(); err != nil {
  96. return fmt.Errorf("delete %s : %v", fullpath, err)
  97. }
  98. return nil
  99. }
  100. func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
  101. limit int) (entries []*filer2.Entry, err error) {
  102. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  103. if inclusive {
  104. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  105. }
  106. var data []byte
  107. var name string
  108. iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
  109. for iter.Scan(&name, &data) {
  110. entry := &filer2.Entry{
  111. FullPath: filer2.NewFullPath(string(fullpath), name),
  112. }
  113. if decodeErr := entry.DecodeAttributesAndChunks(data); decodeErr != nil {
  114. err = decodeErr
  115. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  116. break
  117. }
  118. entries = append(entries, entry)
  119. }
  120. if err := iter.Close(); err != nil {
  121. glog.V(0).Infof("list iterator close: %v", err)
  122. }
  123. return entries, err
  124. }