cassandra_store.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package cassandra
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gocql/gocql"
  6. "github.com/chrislusf/seaweedfs/weed/filer"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. func init() {
  12. filer.Stores = append(filer.Stores, &CassandraStore{})
  13. }
  14. type CassandraStore struct {
  15. cluster *gocql.ClusterConfig
  16. session *gocql.Session
  17. superLargeDirectoryHash map[string]string
  18. }
  19. func (store *CassandraStore) GetName() string {
  20. return "cassandra"
  21. }
  22. func (store *CassandraStore) Initialize(configuration util.Configuration, prefix string) (err error) {
  23. return store.initialize(
  24. configuration.GetString(prefix+"keyspace"),
  25. configuration.GetStringSlice(prefix+"hosts"),
  26. configuration.GetString(prefix+"username"),
  27. configuration.GetString(prefix+"password"),
  28. configuration.GetStringSlice(prefix+"superLargeDirectories"),
  29. configuration.GetString(prefix+"localDC"),
  30. )
  31. }
  32. func (store *CassandraStore) isSuperLargeDirectory(dir string) (dirHash string, isSuperLargeDirectory bool) {
  33. dirHash, isSuperLargeDirectory = store.superLargeDirectoryHash[dir]
  34. return
  35. }
  36. func (store *CassandraStore) initialize(keyspace string, hosts []string, username string, password string, superLargeDirectories []string, localDC string) (err error) {
  37. store.cluster = gocql.NewCluster(hosts...)
  38. if username != "" && password != "" {
  39. store.cluster.Authenticator = gocql.PasswordAuthenticator{Username: username, Password: password}
  40. }
  41. store.cluster.Keyspace = keyspace
  42. fallback := gocql.RoundRobinHostPolicy()
  43. if localDC != "" {
  44. fallback = gocql.DCAwareRoundRobinPolicy(localDC)
  45. }
  46. store.cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback)
  47. store.cluster.Consistency = gocql.LocalQuorum
  48. store.session, err = store.cluster.CreateSession()
  49. if err != nil {
  50. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  51. }
  52. // set directory hash
  53. store.superLargeDirectoryHash = make(map[string]string)
  54. existingHash := make(map[string]string)
  55. for _, dir := range superLargeDirectories {
  56. // adding dir hash to avoid duplicated names
  57. dirHash := util.Md5String([]byte(dir))[:4]
  58. store.superLargeDirectoryHash[dir] = dirHash
  59. if existingDir, found := existingHash[dirHash]; found {
  60. glog.Fatalf("directory %s has the same hash as %s", dir, existingDir)
  61. }
  62. existingHash[dirHash] = dir
  63. }
  64. return
  65. }
  66. func (store *CassandraStore) BeginTransaction(ctx context.Context) (context.Context, error) {
  67. return ctx, nil
  68. }
  69. func (store *CassandraStore) CommitTransaction(ctx context.Context) error {
  70. return nil
  71. }
  72. func (store *CassandraStore) RollbackTransaction(ctx context.Context) error {
  73. return nil
  74. }
  75. func (store *CassandraStore) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
  76. dir, name := entry.FullPath.DirAndName()
  77. if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
  78. dir, name = dirHash+name, ""
  79. }
  80. meta, err := entry.EncodeAttributesAndChunks()
  81. if err != nil {
  82. return fmt.Errorf("encode %s: %s", entry.FullPath, err)
  83. }
  84. if len(entry.Chunks) > 50 {
  85. meta = util.MaybeGzipData(meta)
  86. }
  87. if err := store.session.Query(
  88. "INSERT INTO filemeta (directory,name,meta) VALUES(?,?,?) USING TTL ? ",
  89. dir, name, meta, entry.TtlSec).Exec(); err != nil {
  90. return fmt.Errorf("insert %s: %s", entry.FullPath, err)
  91. }
  92. return nil
  93. }
  94. func (store *CassandraStore) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
  95. return store.InsertEntry(ctx, entry)
  96. }
  97. func (store *CassandraStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
  98. dir, name := fullpath.DirAndName()
  99. if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
  100. dir, name = dirHash+name, ""
  101. }
  102. var data []byte
  103. if err := store.session.Query(
  104. "SELECT meta FROM filemeta WHERE directory=? AND name=?",
  105. dir, name).Scan(&data); err != nil {
  106. if err != gocql.ErrNotFound {
  107. return nil, filer_pb.ErrNotFound
  108. }
  109. }
  110. if len(data) == 0 {
  111. return nil, filer_pb.ErrNotFound
  112. }
  113. entry = &filer.Entry{
  114. FullPath: fullpath,
  115. }
  116. err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
  117. if err != nil {
  118. return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
  119. }
  120. return entry, nil
  121. }
  122. func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
  123. dir, name := fullpath.DirAndName()
  124. if dirHash, ok := store.isSuperLargeDirectory(dir); ok {
  125. dir, name = dirHash+name, ""
  126. }
  127. if err := store.session.Query(
  128. "DELETE FROM filemeta WHERE directory=? AND name=?",
  129. dir, name).Exec(); err != nil {
  130. return fmt.Errorf("delete %s : %v", fullpath, err)
  131. }
  132. return nil
  133. }
  134. func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
  135. if _, ok := store.isSuperLargeDirectory(string(fullpath)); ok {
  136. return nil // filer.ErrUnsupportedSuperLargeDirectoryListing
  137. }
  138. if err := store.session.Query(
  139. "DELETE FROM filemeta WHERE directory=?",
  140. fullpath).Exec(); err != nil {
  141. return fmt.Errorf("delete %s : %v", fullpath, err)
  142. }
  143. return nil
  144. }
  145. func (store *CassandraStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  146. return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
  147. }
  148. func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
  149. if _, ok := store.isSuperLargeDirectory(string(dirPath)); ok {
  150. return // nil, filer.ErrUnsupportedSuperLargeDirectoryListing
  151. }
  152. cqlStr := "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>? ORDER BY NAME ASC LIMIT ?"
  153. if includeStartFile {
  154. cqlStr = "SELECT NAME, meta FROM filemeta WHERE directory=? AND name>=? ORDER BY NAME ASC LIMIT ?"
  155. }
  156. var data []byte
  157. var name string
  158. iter := store.session.Query(cqlStr, string(dirPath), startFileName, limit+1).Iter()
  159. for iter.Scan(&name, &data) {
  160. entry := &filer.Entry{
  161. FullPath: util.NewFullPath(string(dirPath), name),
  162. }
  163. lastFileName = name
  164. if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data)); decodeErr != nil {
  165. err = decodeErr
  166. glog.V(0).Infof("list %s : %v", entry.FullPath, err)
  167. break
  168. }
  169. if !eachEntryFunc(entry) {
  170. break
  171. }
  172. }
  173. if err := iter.Close(); err != nil {
  174. glog.V(0).Infof("list iterator close: %v", err)
  175. }
  176. return lastFileName, err
  177. }
  178. func (store *CassandraStore) Shutdown() {
  179. store.session.Close()
  180. }