cassandra_store.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package cassandra_store
  2. import (
  3. "fmt"
  4. "strings"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/gocql/gocql"
  8. )
  9. /*
  10. Basically you need a table just like this:
  11. CREATE TABLE seaweed_files (
  12. path varchar,
  13. fids list<varchar>,
  14. PRIMARY KEY (path)
  15. );
  16. Need to match flat_namespace.FlatNamespaceStore interface
  17. Put(fullFileName string, fid string) (err error)
  18. Get(fullFileName string) (fid string, err error)
  19. Delete(fullFileName string) (fid string, err error)
  20. */
  21. type CassandraStore struct {
  22. cluster *gocql.ClusterConfig
  23. session *gocql.Session
  24. }
  25. func NewCassandraStore(keyspace string, hosts string) (c *CassandraStore, err error) {
  26. c = &CassandraStore{}
  27. s := strings.Split(hosts, ",")
  28. if len(s) == 1 {
  29. glog.V(2).Info("Only one cassandra node to connect! A cluster is Recommended! Now using:", string(hosts))
  30. c.cluster = gocql.NewCluster(hosts)
  31. } else if len(s) > 1 {
  32. c.cluster = gocql.NewCluster(s...)
  33. }
  34. c.cluster.Keyspace = keyspace
  35. c.cluster.Consistency = gocql.LocalQuorum
  36. c.session, err = c.cluster.CreateSession()
  37. if err != nil {
  38. glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
  39. }
  40. return
  41. }
  42. func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
  43. var input []string
  44. input = append(input, fid)
  45. if err := c.session.Query(
  46. `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
  47. fullFileName, input).Exec(); err != nil {
  48. glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
  49. return err
  50. }
  51. return nil
  52. }
  53. func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
  54. var output []string
  55. if err := c.session.Query(
  56. `select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
  57. fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
  58. if err != gocql.ErrNotFound {
  59. glog.V(0).Infof("Failed to find file %s: %v", fullFileName, err)
  60. return "", filer.ErrNotFound
  61. }
  62. }
  63. if len(output) == 0 {
  64. return "", fmt.Errorf("No file id found for %s", fullFileName)
  65. }
  66. return output[0], nil
  67. }
  68. // Currently the fid is not returned
  69. func (c *CassandraStore) Delete(fullFileName string) (err error) {
  70. if err := c.session.Query(
  71. `DELETE FROM seaweed_files WHERE path = ?`,
  72. fullFileName).Exec(); err != nil {
  73. if err != gocql.ErrNotFound {
  74. glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
  75. }
  76. return err
  77. }
  78. return nil
  79. }
  80. func (c *CassandraStore) Close() {
  81. if c.session != nil {
  82. c.session.Close()
  83. }
  84. }