Browse Source

add ReadRemote(), add read remote setup when filer starts

Chris Lu 3 years ago
parent
commit
c090d6bb25

+ 2 - 0
weed/filer/filer.go

@@ -42,6 +42,7 @@ type Filer struct {
 	MetaAggregator      *MetaAggregator
 	Signature           int32
 	FilerConf           *FilerConf
+	RemoteStorage       *FilerRemoteStorage
 }
 
 func NewFiler(masters []string, grpcDialOption grpc.DialOption,
@@ -51,6 +52,7 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
 		fileIdDeletionQueue: util.NewUnboundedQueue(),
 		GrpcDialOption:      grpcDialOption,
 		FilerConf:           NewFilerConf(),
+		RemoteStorage:       NewFilerRemoteStorage(),
 	}
 	f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
 	f.metaLogCollection = collection

+ 14 - 0
weed/filer/filer_on_meta_event.go

@@ -12,6 +12,7 @@ import (
 // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
 func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
 	f.maybeReloadFilerConfiguration(event)
+	f.maybeReloadRemoteStorageConfigurationAndMapping(event)
 	f.onBucketEvents(event)
 }
 
@@ -80,3 +81,16 @@ func (f *Filer) LoadFilerConf() {
 	}
 	f.FilerConf = fc
 }
+
+////////////////////////////////////
+// load and maintain remote storages
+////////////////////////////////////
+func (f *Filer) LoadRemoteStorageConfAndMapping() {
+	if err := f.RemoteStorage.LoadRemoteStorageConfigurationsAndMapping(f); err != nil {
+		glog.Errorf("read remote conf and mapping: %v", err)
+		return
+	}
+}
+func (f *Filer) maybeReloadRemoteStorageConfigurationAndMapping(event *filer_pb.SubscribeMetadataResponse) {
+	// FIXME add reloading
+}

+ 22 - 4
weed/filer/filer_remote_storage.go

@@ -31,7 +31,7 @@ func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
 	return rs
 }
 
-func (rs *FilerRemoteStorage) loadRemoteStorageConfigurations(filer *Filer) (err error) {
+func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
 	// execute this on filer
 
 	entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "")
@@ -74,7 +74,21 @@ func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, rem
 	rs.rules.Put([]byte(dir+"/"), remoteStorageName)
 }
 
-func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, found bool) {
+func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation remote_storage.RemoteStorageLocation) {
+	var storageLocation string
+	rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
+		mountDir = util.FullPath(string(key))
+		storageLocation = value.(string)
+		return true
+	})
+	if storageLocation == "" {
+		return
+	}
+	remoteLocation = remote_storage.RemoteStorageLocation(storageLocation)
+	return
+}
+
+func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
 	var storageLocation string
 	rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
 		storageLocation = value.(string)
@@ -87,8 +101,12 @@ func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client r
 
 	storageName, _, _ := remote_storage.RemoteStorageLocation(storageLocation).NameBucketPath()
 
-	remoteConf, ok := rs.storageNameToConf[storageName]
-	if !ok {
+	return rs.GetRemoteStorageClient(storageName)
+}
+
+func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
+	remoteConf, found = rs.storageNameToConf[storageName]
+	if !found {
 		return
 	}
 

+ 4 - 4
weed/filer/filer_remote_storage_test.go

@@ -16,15 +16,15 @@ func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) {
 
 	rs.mapDirectoryToRemoteStorage("/a/b/c", "s7")
 
-	_, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f")
+	_, _, found := rs.FindRemoteStorageClient("/a/b/c/d/e/f")
 	assert.Equal(t, true, found, "find storage client")
 
-	_, found2 := rs.FindRemoteStorageClient("/a/b")
+	_, _, found2 := rs.FindRemoteStorageClient("/a/b")
 	assert.Equal(t, false, found2, "should not find storage client")
 
-	_, found3 := rs.FindRemoteStorageClient("/a/b/c")
+	_, _, found3 := rs.FindRemoteStorageClient("/a/b/c")
 	assert.Equal(t, false, found3, "should not find storage client")
 
-	_, found4 := rs.FindRemoteStorageClient("/a/b/cc")
+	_, _, found4 := rs.FindRemoteStorageClient("/a/b/cc")
 	assert.Equal(t, false, found4, "should not find storage client")
 }

+ 27 - 0
weed/filer/read_remote.go

@@ -0,0 +1,27 @@
+package filer
+
+import (
+	"fmt"
+	"io"
+)
+
+func (entry *Entry) IsRemoteOnly() bool {
+	return len(entry.Chunks) == 0 && entry.Remote != nil && entry.Remote.Size > 0
+}
+
+func (f *Filer) ReadRemote(w io.Writer, entry *Entry, offset int64, size int64) error {
+	client, _, found := f.RemoteStorage.GetRemoteStorageClient(remoteEntry.Remote.StorageName)
+	if !found {
+		return fmt.Errorf("remote storage %v not found", entry.Remote.StorageName)
+	}
+
+	mountDir, remoteLoation := f.RemoteStorage.FindMountDirectory(entry.FullPath)
+	_, bucket, path := remoteLoation.NameBucketPath()
+
+	remoteFullPath := path + string(entry.FullPath[len(mountDir):])
+
+	client.ReadFile(bucket, remoteFullPath[1:], offset, size, func(w io.Writer) error {
+
+	})
+	return nil
+}

+ 4 - 2
weed/remote_storage/remote_storage.go

@@ -3,6 +3,7 @@ package remote_storage
 import (
 	"fmt"
 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+	"io"
 	"strings"
 	"sync"
 )
@@ -14,10 +15,10 @@ func (remote RemoteStorageLocation) NameBucketPath() (storageName, bucket, remot
 		remote = remote[:len(remote)-1]
 	}
 	parts := strings.SplitN(string(remote), "/", 3)
-	if len(parts)>=1 {
+	if len(parts) >= 1 {
 		storageName = parts[0]
 	}
-	if len(parts)>=2 {
+	if len(parts) >= 2 {
 		bucket = parts[1]
 	}
 	remotePath = string(remote[len(storageName)+1+len(bucket):])
@@ -31,6 +32,7 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
 
 type RemoteStorageClient interface {
 	Traverse(remote RemoteStorageLocation, visitFn VisitFunc) error
+	ReadFile(bucket, key string, offset int64, size int64, writeFn func(w io.Writer) error) error
 }
 
 type RemoteStorageClientMaker interface {

+ 2 - 0
weed/server/filer_server.go

@@ -149,6 +149,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
 
 	fs.filer.LoadFilerConf()
 
+	fs.filer.LoadRemoteStorageConfAndMapping()
+
 	grace.OnInterrupt(func() {
 		fs.filer.Shutdown()
 	})

+ 11 - 4
weed/server/filer_server_handlers_read.go

@@ -101,7 +101,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
 
 	//Seaweed custom header are not visible to Vue or javascript
 	seaweedHeaders := []string{}
-	for header, _ := range w.Header() {
+	for header := range w.Header() {
 		if strings.HasPrefix(header, "Seaweed-") {
 			seaweedHeaders = append(seaweedHeaders, header)
 		}
@@ -163,9 +163,16 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
 			}
 			return err
 		}
-		err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
-		if err != nil {
-			glog.Errorf("failed to stream content %s: %v", r.URL, err)
+		if entry.IsRemoteOnly() {
+			err = fs.filer.ReadRemote(writer, entry, offset, size)
+			if err != nil {
+				glog.Errorf("failed to read remote %s: %v", r.URL, err)
+			}
+		} else {
+			err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+			if err != nil {
+				glog.Errorf("failed to stream content %s: %v", r.URL, err)
+			}
 		}
 		return err
 	})