123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package cassandra_store
- import (
- "fmt"
- "strings"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/gocql/gocql"
- )
- /*
- Basically you need a table just like this:
- CREATE TABLE seaweed_files (
- path varchar,
- fids list<varchar>,
- PRIMARY KEY (path)
- );
- Need to match flat_namespace.FlatNamespaceStore interface
- Put(fullFileName string, fid string) (err error)
- Get(fullFileName string) (fid string, err error)
- Delete(fullFileName string) (fid string, err error)
- */
- type CassandraStore struct {
- cluster *gocql.ClusterConfig
- session *gocql.Session
- }
- func NewCassandraStore(keyspace string, hosts string) (c *CassandraStore, err error) {
- c = &CassandraStore{}
- s := strings.Split(hosts, ",")
- if len(s) == 1 {
- glog.V(2).Info("Only one cassandra node to connect! A cluster is Recommended! Now using:", string(hosts))
- c.cluster = gocql.NewCluster(hosts)
- } else if len(s) > 1 {
- c.cluster = gocql.NewCluster(s...)
- }
- c.cluster.Keyspace = keyspace
- c.cluster.Consistency = gocql.LocalQuorum
- c.session, err = c.cluster.CreateSession()
- if err != nil {
- glog.V(0).Infof("Failed to open cassandra store, hosts %v, keyspace %s", hosts, keyspace)
- }
- return
- }
- func (c *CassandraStore) Put(fullFileName string, fid string) (err error) {
- var input []string
- input = append(input, fid)
- if err := c.session.Query(
- `INSERT INTO seaweed_files (path, fids) VALUES (?, ?)`,
- fullFileName, input).Exec(); err != nil {
- glog.V(0).Infof("Failed to save file %s with id %s: %v", fullFileName, fid, err)
- return err
- }
- return nil
- }
- func (c *CassandraStore) Get(fullFileName string) (fid string, err error) {
- var output []string
- if err := c.session.Query(
- `select fids FROM seaweed_files WHERE path = ? LIMIT 1`,
- fullFileName).Consistency(gocql.One).Scan(&output); err != nil {
- if err != gocql.ErrNotFound {
- glog.V(0).Infof("Failed to find file %s: %v", fullFileName, err)
- return "", filer.ErrNotFound
- }
- }
- if len(output) == 0 {
- return "", fmt.Errorf("No file id found for %s", fullFileName)
- }
- return output[0], nil
- }
- // Currently the fid is not returned
- func (c *CassandraStore) Delete(fullFileName string) (err error) {
- if err := c.session.Query(
- `DELETE FROM seaweed_files WHERE path = ?`,
- fullFileName).Exec(); err != nil {
- if err != gocql.ErrNotFound {
- glog.V(0).Infof("Failed to delete file %s: %v", fullFileName, err)
- }
- return err
- }
- return nil
- }
- func (c *CassandraStore) Close() {
- if c.session != nil {
- c.session.Close()
- }
- }
|