cassandra_store.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package cassandra
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gocql/gocql"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/util/log"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. func init() {
  12. filer.Stores = append(filer.Stores, &CassandraStore{})
  13. }
  14. type CassandraStore struct {
  15. cluster *gocql.ClusterConfig
  16. session *gocql.Session
  17. }
  18. func (store *CassandraStore) GetName() string {
  19. return "cassandra"
  20. }
  21. func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  22. return store.initialize(
  23. configuration.GetString(prefix+"keyspace"),
  24. configuration.GetStringSlice(prefix+"hosts"),
  25. configuration.GetString(prefix+"username"),
  26. configuration.GetString(prefix+"password"),
  27. )
  28. }
  29. func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string) (err error) {
  30. store.cluster = gocql.NewCluster(hosts...)
  31. if username != "" && password != "" {
  32. store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
  33. }
  34. store.cluster.Keyspace = keyspace
  35. store.cluster.Consistency = gocql.LocalQuorum
  36. store.session, err = store.cluster.CreateSession()
  37. if err != nil {
  38. log.Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  39. }
  40. return
  41. }
  42. func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  43. return ctx, nil
  44. }
  45. func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
  46. return nil
  47. }
  48. func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
  49. return nil
  50. }
  51. func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  52. dir, name := entry.FullPath.DirAndName()
  53. meta, err := entry.EncodeAttributesAndChunks()
  54. if err != nil {
  55. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  56. }
  57. if len(entry.Chunks) > 50 {
  58. meta = util.MaybeGzipData(meta)
  59. }
  60. if err := store.session.Query(
  61. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  62. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  63. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  64. }
  65. return nil
  66. }
  67. func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  68. return store.InsertEntry(ctx, entry)
  69. }
  70. func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  71. dir, name := fullpath.DirAndName()
  72. var data []byte
  73. if err := store.session.Query(
  74. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  75. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  76. if err != gocql.ErrNotFound {
  77. return nil, filer_pb.ErrNotFound
  78. }
  79. }
  80. if len(data) == 0 {
  81. return nil, filer_pb.ErrNotFound
  82. }
  83. entry = &filer.Entry{
  84. FullPath: fullpath,
  85. }
  86. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
  87. if err != nil {
  88. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  89. }
  90. return entry, nil
  91. }
  92. func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  93. dir, name := fullpath.DirAndName()
  94. if err := store.session.Query(
  95. "DELETE FROM filemeta WHERE directory=? AND name=?",
  96. dir, name).Exec(); err != nil {
  97. return fmt.Errorf("delete %s : %v", fullpath, err)
  98. }
  99. return nil
  100. }
  101. func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  102. if err := store.session.Query(
  103. "DELETE FROM filemeta WHERE directory=?",
  104. fullpath).Exec(); err != nil {
  105. return fmt.Errorf("delete %s : %v", fullpath, err)
  106. }
  107. return nil
  108. }
  109. func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
  110. return nil, filer.ErrUnsupportedListDirectoryPrefixed
  111. }
  112. func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath util.FullPath, startFileName string, inclusive bool,
  113. limit int) (entries []*filer.Entry, err error) {
  114. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  115. if inclusive {
  116. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  117. }
  118. var data []byte
  119. var name string
  120. iter := store.session.Query(cqlStr, string(fullpath), startFileName, limit).Iter()
  121. for iter.Scan(&name, &data) {
  122. entry := &filer.Entry{
  123. FullPath: util.NewFullPath(string(fullpath), name),
  124. }
  125. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
  126. err = decodeErr
  127. log.Infof("list %s : %v", entry.FullPath, err)
  128. break
  129. }
  130. entries = append(entries, entry)
  131. }
  132. if err := iter.Close(); err != nil {
  133. log.Infof("list iterator close: %v", err)
  134. }
  135. return entries, err
  136. }
  137. func (store *CassandraStore) Shutdown() {
  138. store.session.Close()
  139. }