123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- package filer
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
- "github.com/seaweedfs/seaweedfs/weed/remote_storage"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "google.golang.org/protobuf/proto"
- "math"
- "strings"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/viant/ptrie"
- )
- const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
- const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
- type FilerRemoteStorage struct {
- rules ptrie.Trie
- storageNameToConf map[string]*remote_pb.RemoteConf
- }
- func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
- rs = &FilerRemoteStorage{
- rules: ptrie.New(),
- storageNameToConf: make(map[string]*remote_pb.RemoteConf),
- }
- return rs
- }
- func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
- // execute this on filer
- limit := int64(math.MaxInt32)
- entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, limit, "", "", "")
- if err != nil {
- if err == filer_pb.ErrNotFound {
- return nil
- }
- glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
- return
- }
- for _, entry := range entries {
- if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
- if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
- return err
- }
- continue
- }
- if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
- return nil
- }
- conf := &remote_pb.RemoteConf{}
- if err := proto.Unmarshal(entry.Content, conf); err != nil {
- return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
- }
- rs.storageNameToConf[conf.Name] = conf
- }
- return nil
- }
- func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
- mappings := &remote_pb.RemoteStorageMapping{}
- if err := proto.Unmarshal(data, mappings); err != nil {
- return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
- }
- for dir, storageLocation := range mappings.Mappings {
- rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
- }
- return nil
- }
- func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *remote_pb.RemoteStorageLocation) {
- rs.rules.Put([]byte(dir+"/"), loc)
- }
- func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) {
- rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
- mountDir = util.FullPath(string(key[:len(key)-1]))
- remoteLocation = value.(*remote_pb.RemoteStorageLocation)
- return true
- })
- return
- }
- func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
- var storageLocation *remote_pb.RemoteStorageLocation
- rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
- storageLocation = value.(*remote_pb.RemoteStorageLocation)
- return true
- })
- if storageLocation == nil {
- found = false
- return
- }
- return rs.GetRemoteStorageClient(storageLocation.Name)
- }
- func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *remote_pb.RemoteConf, found bool) {
- remoteConf, found = rs.storageNameToConf[storageName]
- if !found {
- return
- }
- var err error
- if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
- found = true
- return
- }
- return
- }
- func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.RemoteStorageMapping, err error) {
- mappings = &remote_pb.RemoteStorageMapping{
- Mappings: make(map[string]*remote_pb.RemoteStorageLocation),
- }
- if len(oldContent) > 0 {
- if err = proto.Unmarshal(oldContent, mappings); err != nil {
- glog.Warningf("unmarshal existing mappings: %v", err)
- }
- }
- return
- }
- func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
- var oldContent []byte
- if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
- return readErr
- }); readErr != nil {
- return nil, readErr
- }
- // unmarshal storage configuration
- conf = &remote_pb.RemoteConf{}
- if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
- readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
- return
- }
- return
- }
- func DetectMountInfo(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, dir string) (*remote_pb.RemoteStorageMapping, string, *remote_pb.RemoteStorageLocation, *remote_pb.RemoteConf, error) {
- mappings, listErr := ReadMountMappings(grpcDialOption, filerAddress)
- if listErr != nil {
- return nil, "", nil, nil, listErr
- }
- if dir == "" {
- return mappings, "", nil, nil, fmt.Errorf("need to specify '-dir' option")
- }
- var localMountedDir string
- var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
- for k, loc := range mappings.Mappings {
- if strings.HasPrefix(dir, k) {
- localMountedDir, remoteStorageMountedLocation = k, loc
- }
- }
- if localMountedDir == "" {
- return mappings, localMountedDir, remoteStorageMountedLocation, nil, fmt.Errorf("%s is not mounted", dir)
- }
- // find remote storage configuration
- remoteStorageConf, err := ReadRemoteStorageConf(grpcDialOption, filerAddress, remoteStorageMountedLocation.Name)
- if err != nil {
- return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, err
- }
- return mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, nil
- }
|