remote_storage.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package remote_storage
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  6. "google.golang.org/protobuf/proto"
  7. "io"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. const slash = "/"
  14. func ParseLocationName(remote string) (locationName string) {
  15. remote = strings.TrimSuffix(remote, slash)
  16. parts := strings.SplitN(remote, slash, 2)
  17. if len(parts) >= 1 {
  18. return parts[0]
  19. }
  20. return
  21. }
  22. func parseBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
  23. loc = &remote_pb.RemoteStorageLocation{}
  24. remote = strings.TrimSuffix(remote, slash)
  25. parts := strings.SplitN(remote, slash, 3)
  26. if len(parts) >= 1 {
  27. loc.Name = parts[0]
  28. }
  29. if len(parts) >= 2 {
  30. loc.Bucket = parts[1]
  31. }
  32. loc.Path = remote[len(loc.Name)+1+len(loc.Bucket):]
  33. if loc.Path == "" {
  34. loc.Path = slash
  35. }
  36. return
  37. }
  38. func parseNoBucketLocation(remote string) (loc *remote_pb.RemoteStorageLocation) {
  39. loc = &remote_pb.RemoteStorageLocation{}
  40. remote = strings.TrimSuffix(remote, slash)
  41. parts := strings.SplitN(remote, slash, 2)
  42. if len(parts) >= 1 {
  43. loc.Name = parts[0]
  44. }
  45. loc.Path = remote[len(loc.Name):]
  46. if loc.Path == "" {
  47. loc.Path = slash
  48. }
  49. return
  50. }
  51. func FormatLocation(loc *remote_pb.RemoteStorageLocation) string {
  52. if loc.Bucket == "" {
  53. return fmt.Sprintf("%s%s", loc.Name, loc.Path)
  54. }
  55. return fmt.Sprintf("%s/%s%s", loc.Name, loc.Bucket, loc.Path)
  56. }
  57. type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error
  58. type Bucket struct {
  59. Name string
  60. CreatedAt time.Time
  61. }
  62. type RemoteStorageClient interface {
  63. Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error
  64. ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
  65. WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
  66. RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error)
  67. WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error)
  68. UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error)
  69. DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error)
  70. ListBuckets() ([]*Bucket, error)
  71. CreateBucket(name string) (err error)
  72. DeleteBucket(name string) (err error)
  73. }
  74. type RemoteStorageClientMaker interface {
  75. Make(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error)
  76. HasBucket() bool
  77. }
  78. type CachedRemoteStorageClient struct {
  79. *remote_pb.RemoteConf
  80. RemoteStorageClient
  81. }
  82. var (
  83. RemoteStorageClientMakers = make(map[string]RemoteStorageClientMaker)
  84. remoteStorageClients = make(map[string]CachedRemoteStorageClient)
  85. remoteStorageClientsLock sync.Mutex
  86. )
  87. func GetAllRemoteStorageNames() string {
  88. var storageNames []string
  89. for k := range RemoteStorageClientMakers {
  90. storageNames = append(storageNames, k)
  91. }
  92. sort.Strings(storageNames)
  93. return strings.Join(storageNames, "|")
  94. }
  95. func GetRemoteStorageNamesHasBucket() string {
  96. var storageNames []string
  97. for k, m := range RemoteStorageClientMakers {
  98. if m.HasBucket() {
  99. storageNames = append(storageNames, k)
  100. }
  101. }
  102. sort.Strings(storageNames)
  103. return strings.Join(storageNames, "|")
  104. }
  105. func ParseRemoteLocation(remoteConfType string, remote string) (remoteStorageLocation *remote_pb.RemoteStorageLocation, err error) {
  106. maker, found := RemoteStorageClientMakers[remoteConfType]
  107. if !found {
  108. return nil, fmt.Errorf("remote storage type %s not found", remoteConfType)
  109. }
  110. if !maker.HasBucket() {
  111. return parseNoBucketLocation(remote), nil
  112. }
  113. return parseBucketLocation(remote), nil
  114. }
  115. func makeRemoteStorageClient(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) {
  116. maker, found := RemoteStorageClientMakers[remoteConf.Type]
  117. if !found {
  118. return nil, fmt.Errorf("remote storage type %s not found", remoteConf.Type)
  119. }
  120. return maker.Make(remoteConf)
  121. }
  122. func GetRemoteStorage(remoteConf *remote_pb.RemoteConf) (RemoteStorageClient, error) {
  123. remoteStorageClientsLock.Lock()
  124. defer remoteStorageClientsLock.Unlock()
  125. existingRemoteStorageClient, found := remoteStorageClients[remoteConf.Name]
  126. if found && proto.Equal(existingRemoteStorageClient.RemoteConf, remoteConf) {
  127. return existingRemoteStorageClient.RemoteStorageClient, nil
  128. }
  129. newRemoteStorageClient, err := makeRemoteStorageClient(remoteConf)
  130. if err != nil {
  131. return nil, fmt.Errorf("make remote storage client %s: %v", remoteConf.Name, err)
  132. }
  133. remoteStorageClients[remoteConf.Name] = CachedRemoteStorageClient{
  134. RemoteConf: remoteConf,
  135. RemoteStorageClient: newRemoteStorageClient,
  136. }
  137. return newRemoteStorageClient, nil
  138. }