cassandra_store.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package cassandra
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gocql/gocql"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. func init() {
  12. filer.Stores = append(filer.Stores, &CassandraStore{})
  13. }
  14. type CassandraStore struct {
  15. cluster *gocql.ClusterConfig
  16. session *gocql.Session
  17. superLargeDirectoryHash map[string]string
  18. }
  19. func (store *CassandraStore) GetName() string {
  20. return "cassandra"
  21. }
  22. func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  23. return store.initialize(
  24. configuration.GetString(prefix+"keyspace"),
  25. configuration.GetStringSlice(prefix+"hosts"),
  26. configuration.GetString(prefix+"username"),
  27. configuration.GetString(prefix+"password"),
  28. configuration.GetStringSlice(prefix+"superLargeDirectories"),
  29. )
  30. }
  31. func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
  32. dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
  33. return
  34. }
  35. func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string) (err error) {
  36. store.cluster = gocql.NewCluster(hosts...)
  37. if username != "" && password != "" {
  38. store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
  39. }
  40. store.cluster.Keyspace = keyspace
  41. store.cluster.Consistency = gocql.LocalQuorum
  42. store.session, err = store.cluster.CreateSession()
  43. if err != nil {
  44. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  45. }
  46. // set directory hash
  47. store.superLargeDirectoryHash = make(map[string]string)
  48. existingHash := make(map[string]string)
  49. for _, dir := range superLargeDirectories {
  50. // adding dir hash to avoid duplicated names
  51. dirHash := util.Md5String([]byte(dir))[:4]
  52. store.superLargeDirectoryHash[dir] = dirHash
  53. if existingDir, found := existingHash[dirHash]; found {
  54. glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
  55. }
  56. existingHash[dirHash] = dir
  57. }
  58. return
  59. }
  60. func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  61. return ctx, nil
  62. }
  63. func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
  64. return nil
  65. }
  66. func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
  67. return nil
  68. }
  69. func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  70. dir, name := entry.FullPath.DirAndName()
  71. if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
  72. dir, name = dirHash+name, ""
  73. }
  74. meta, err := entry.EncodeAttributesAndChunks()
  75. if err != nil {
  76. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  77. }
  78. if len(entry.Chunks) > 50 {
  79. meta = util.MaybeGzipData(meta)
  80. }
  81. if err := store.session.Query(
  82. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  83. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  84. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  85. }
  86. return nil
  87. }
  88. func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  89. return store.InsertEntry(ctx, entry)
  90. }
  91. func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  92. dir, name := fullpath.DirAndName()
  93. if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
  94. dir, name = dirHash+name, ""
  95. }
  96. var data []byte
  97. if err := store.session.Query(
  98. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  99. dir, name).Consistency(gocql.One).Scan(&data); err != nil {
  100. if err != gocql.ErrNotFound {
  101. return nil, filer_pb.ErrNotFound
  102. }
  103. }
  104. if len(data) == 0 {
  105. return nil, filer_pb.ErrNotFound
  106. }
  107. entry = &filer.Entry{
  108. FullPath: fullpath,
  109. }
  110. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
  111. if err != nil {
  112. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  113. }
  114. return entry, nil
  115. }
  116. func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  117. dir, name := fullpath.DirAndName()
  118. if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
  119. dir, name = dirHash+name, ""
  120. }
  121. if err := store.session.Query(
  122. "DELETE FROM filemeta WHERE directory=? AND name=?",
  123. dir, name).Exec(); err != nil {
  124. return fmt.Errorf("delete %s : %v", fullpath, err)
  125. }
  126. return nil
  127. }
  128. func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  129. if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
  130. return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
  131. }
  132. if err := store.session.Query(
  133. "DELETE FROM filemeta WHERE directory=?",
  134. fullpath).Exec(); err != nil {
  135. return fmt.Errorf("delete %s : %v", fullpath, err)
  136. }
  137. return nil
  138. }
  139. func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  140. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  141. }
  142. func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  143. if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {
  144. return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
  145. }
  146. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  147. if includeStartFile {
  148. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  149. }
  150. var data []byte
  151. var name string
  152. iter := store.session.Query(cqlStr, string(dirPath), startFileName, limit+1).Iter()
  153. for iter.Scan(&name, &data) {
  154. entry := &filer.Entry{
  155. FullPath: util.NewFullPath(string(dirPath), name),
  156. }
  157. lastFileName = name
  158. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
  159. err = decodeErr
  160. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  161. break
  162. }
  163. if !eachEntryFunc(entry) {
  164. break
  165. }
  166. }
  167. if err := iter.Close(); err != nil {
  168. glog.V(0).Infof("list iterator close: %v", err)
  169. }
  170. return lastFileName, err
  171. }
  172. func (store *CassandraStore) Shutdown() {
  173. store.session.Close()
  174. }