filer_remote_storage.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/golang/protobuf/proto"
  9. "google.golang.org/grpc"
  10. "math"
  11. "strings"
  12. "github.com/chrislusf/seaweedfs/weed/glog"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/viant/ptrie"
  15. )
  16. const REMOTE_STORAGE_CONF_SUFFIX = ".conf"
  17. const REMOTE_STORAGE_MOUNT_FILE = "mount.mapping"
  18. type FilerRemoteStorage struct {
  19. rules ptrie.Trie
  20. storageNameToConf map[string]*filer_pb.RemoteConf
  21. }
  22. func NewFilerRemoteStorage() (rs *FilerRemoteStorage) {
  23. rs = &FilerRemoteStorage{
  24. rules: ptrie.New(),
  25. storageNameToConf: make(map[string]*filer_pb.RemoteConf),
  26. }
  27. return rs
  28. }
  29. func (rs *FilerRemoteStorage) LoadRemoteStorageConfigurationsAndMapping(filer *Filer) (err error) {
  30. // execute this on filer
  31. entries, _, err := filer.ListDirectoryEntries(context.Background(), DirectoryEtcRemote, "", false, math.MaxInt64, "", "", "")
  32. if err != nil {
  33. if err == filer_pb.ErrNotFound {
  34. return nil
  35. }
  36. glog.Errorf("read remote storage %s: %v", DirectoryEtcRemote, err)
  37. return
  38. }
  39. for _, entry := range entries {
  40. if entry.Name() == REMOTE_STORAGE_MOUNT_FILE {
  41. if err := rs.loadRemoteStorageMountMapping(entry.Content); err != nil {
  42. return err
  43. }
  44. continue
  45. }
  46. if !strings.HasSuffix(entry.Name(), REMOTE_STORAGE_CONF_SUFFIX) {
  47. return nil
  48. }
  49. conf := &filer_pb.RemoteConf{}
  50. if err := proto.Unmarshal(entry.Content, conf); err != nil {
  51. return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, entry.Name(), err)
  52. }
  53. rs.storageNameToConf[conf.Name] = conf
  54. }
  55. return nil
  56. }
  57. func (rs *FilerRemoteStorage) loadRemoteStorageMountMapping(data []byte) (err error) {
  58. mappings := &filer_pb.RemoteStorageMapping{}
  59. if err := proto.Unmarshal(data, mappings); err != nil {
  60. return fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, err)
  61. }
  62. for dir, storageLocation := range mappings.Mappings {
  63. rs.mapDirectoryToRemoteStorage(util.FullPath(dir), storageLocation)
  64. }
  65. return nil
  66. }
  67. func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc *filer_pb.RemoteStorageLocation) {
  68. rs.rules.Put([]byte(dir+"/"), loc)
  69. }
  70. func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *filer_pb.RemoteStorageLocation) {
  71. rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
  72. mountDir = util.FullPath(string(key[:len(key)-1]))
  73. remoteLocation = value.(*filer_pb.RemoteStorageLocation)
  74. return true
  75. })
  76. return
  77. }
  78. func (rs *FilerRemoteStorage) FindRemoteStorageClient(p util.FullPath) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
  79. var storageLocation *filer_pb.RemoteStorageLocation
  80. rs.rules.MatchPrefix([]byte(p), func(key []byte, value interface{}) bool {
  81. storageLocation = value.(*filer_pb.RemoteStorageLocation)
  82. return true
  83. })
  84. if storageLocation == nil {
  85. found = false
  86. return
  87. }
  88. return rs.GetRemoteStorageClient(storageLocation.Name)
  89. }
  90. func (rs *FilerRemoteStorage) GetRemoteStorageClient(storageName string) (client remote_storage.RemoteStorageClient, remoteConf *filer_pb.RemoteConf, found bool) {
  91. remoteConf, found = rs.storageNameToConf[storageName]
  92. if !found {
  93. return
  94. }
  95. var err error
  96. if client, err = remote_storage.GetRemoteStorage(remoteConf); err == nil {
  97. found = true
  98. return
  99. }
  100. return
  101. }
  102. func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *filer_pb.RemoteStorageMapping, err error) {
  103. mappings = &filer_pb.RemoteStorageMapping{
  104. Mappings: make(map[string]*filer_pb.RemoteStorageLocation),
  105. }
  106. if len(oldContent) > 0 {
  107. if err = proto.Unmarshal(oldContent, mappings); err != nil {
  108. glog.Warningf("unmarshal existing mappings: %v", err)
  109. }
  110. }
  111. return
  112. }
  113. func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *filer_pb.RemoteStorageLocation) (newContent []byte, err error) {
  114. mappings, unmarshalErr := UnmarshalRemoteStorageMappings(oldContent)
  115. if unmarshalErr != nil {
  116. // skip
  117. }
  118. // set the new mapping
  119. mappings.Mappings[dir] = storageLocation
  120. if newContent, err = proto.Marshal(mappings); err != nil {
  121. return oldContent, fmt.Errorf("marshal mappings: %v", err)
  122. }
  123. return
  124. }
  125. func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
  126. var oldContent []byte
  127. if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  128. oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
  129. return readErr
  130. }); readErr != nil {
  131. return nil, readErr
  132. }
  133. mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
  134. if readErr != nil {
  135. return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
  136. }
  137. return
  138. }
  139. func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
  140. var oldContent []byte
  141. if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  142. oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
  143. return readErr
  144. }); readErr != nil {
  145. return nil, readErr
  146. }
  147. // unmarshal storage configuration
  148. conf = &filer_pb.RemoteConf{}
  149. if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
  150. readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
  151. return
  152. }
  153. return
  154. }