azure_storage_client.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package azure
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  6. "io"
  7. "net/url"
  8. "os"
  9. "reflect"
  10. "strings"
  11. "github.com/Azure/azure-storage-blob-go/azblob"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. )
  18. func init() {
  19. remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker)
  20. }
  21. type azureRemoteStorageMaker struct{}
  22. func (s azureRemoteStorageMaker) HasBucket() bool {
  23. return true
  24. }
  25. func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  26. client := &azureRemoteStorageClient{
  27. conf: conf,
  28. }
  29. accountName, accountKey := conf.AzureAccountName, conf.AzureAccountKey
  30. if len(accountName) == 0 || len(accountKey) == 0 {
  31. accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
  32. if len(accountName) == 0 || len(accountKey) == 0 {
  33. return nil, fmt.Errorf("either AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
  34. }
  35. }
  36. // Use your Storage account's name and key to create a credential object.
  37. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
  38. if err != nil {
  39. return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err)
  40. }
  41. // Create a request pipeline that is used to process HTTP(S) requests and responses.
  42. p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
  43. // Create an ServiceURL object that wraps the service URL and a request pipeline.
  44. u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
  45. client.serviceURL = azblob.NewServiceURL(*u, p)
  46. return client, nil
  47. }
  48. type azureRemoteStorageClient struct {
  49. conf *remote_pb.RemoteConf
  50. serviceURL azblob.ServiceURL
  51. }
  52. var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
  53. func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  54. pathKey := loc.Path[1:]
  55. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  56. // List the container that we have created above
  57. for marker := (azblob.Marker{}); marker.NotDone(); {
  58. // Get a result segment starting with the blob indicated by the current Marker.
  59. listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{
  60. Prefix: pathKey,
  61. })
  62. if err != nil {
  63. return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err)
  64. }
  65. // ListBlobs returns the start of the next segment; you MUST use this to get
  66. // the next segment (after processing the current result segment).
  67. marker = listBlob.NextMarker
  68. // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
  69. for _, blobInfo := range listBlob.Segment.BlobItems {
  70. key := blobInfo.Name
  71. key = "/" + key
  72. dir, name := util.FullPath(key).DirAndName()
  73. err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
  74. RemoteMtime: blobInfo.Properties.LastModified.Unix(),
  75. RemoteSize: *blobInfo.Properties.ContentLength,
  76. RemoteETag: string(blobInfo.Properties.Etag),
  77. StorageName: az.conf.Name,
  78. })
  79. if err != nil {
  80. return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err)
  81. }
  82. }
  83. }
  84. return
  85. }
  86. func (az *azureRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
  87. key := loc.Path[1:]
  88. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  89. blobURL := containerURL.NewBlockBlobURL(key)
  90. downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
  91. if readErr != nil {
  92. return nil, readErr
  93. }
  94. // NOTE: automatically retries are performed if the connection fails
  95. bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
  96. defer bodyStream.Close()
  97. data, err = io.ReadAll(bodyStream)
  98. if err != nil {
  99. return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
  100. }
  101. return
  102. }
  103. func (az *azureRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
  104. return nil
  105. }
  106. func (az *azureRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
  107. return nil
  108. }
  109. func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
  110. key := loc.Path[1:]
  111. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  112. blobURL := containerURL.NewBlockBlobURL(key)
  113. readerAt, ok := reader.(io.ReaderAt)
  114. if !ok {
  115. return nil, fmt.Errorf("unexpected reader: readerAt expected")
  116. }
  117. fileSize := int64(filer.FileSize(entry))
  118. _, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{
  119. BlockSize: 4 * 1024 * 1024,
  120. BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: entry.Attributes.Mime},
  121. Metadata: toMetadata(entry.Extended),
  122. Parallelism: 16,
  123. })
  124. if err != nil {
  125. return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err)
  126. }
  127. // read back the remote entry
  128. return az.readFileRemoteEntry(loc)
  129. }
  130. func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
  131. key := loc.Path[1:]
  132. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  133. blobURL := containerURL.NewBlockBlobURL(key)
  134. attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
  135. if err != nil {
  136. return nil, err
  137. }
  138. return &filer_pb.RemoteEntry{
  139. RemoteMtime: attr.LastModified().Unix(),
  140. RemoteSize: attr.ContentLength(),
  141. RemoteETag: string(attr.ETag()),
  142. StorageName: az.conf.Name,
  143. }, nil
  144. }
  145. func toMetadata(attributes map[string][]byte) map[string]string {
  146. metadata := make(map[string]string)
  147. for k, v := range attributes {
  148. if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
  149. metadata[k[len(s3_constants.AmzUserMetaPrefix):]] = string(v)
  150. }
  151. }
  152. parsed_metadata := make(map[string]string)
  153. for k, v := range metadata {
  154. parsed_metadata[strings.Replace(k, "-", "_", -1)] = v
  155. }
  156. return parsed_metadata
  157. }
  158. func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
  159. if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) {
  160. return nil
  161. }
  162. metadata := toMetadata(newEntry.Extended)
  163. key := loc.Path[1:]
  164. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  165. _, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
  166. return
  167. }
  168. func (az *azureRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
  169. key := loc.Path[1:]
  170. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  171. if _, err = containerURL.NewBlobURL(key).Delete(context.Background(),
  172. azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
  173. return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err)
  174. }
  175. return
  176. }
  177. func (az *azureRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
  178. ctx := context.Background()
  179. for containerMarker := (azblob.Marker{}); containerMarker.NotDone(); {
  180. listContainer, err := az.serviceURL.ListContainersSegment(ctx, containerMarker, azblob.ListContainersSegmentOptions{})
  181. if err == nil {
  182. for _, v := range listContainer.ContainerItems {
  183. buckets = append(buckets, &remote_storage.Bucket{
  184. Name: v.Name,
  185. CreatedAt: v.Properties.LastModified,
  186. })
  187. }
  188. } else {
  189. return buckets, err
  190. }
  191. containerMarker = listContainer.NextMarker
  192. }
  193. return
  194. }
  195. func (az *azureRemoteStorageClient) CreateBucket(name string) (err error) {
  196. containerURL := az.serviceURL.NewContainerURL(name)
  197. if _, err = containerURL.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessNone); err != nil {
  198. return fmt.Errorf("create bucket %s: %v", name, err)
  199. }
  200. return
  201. }
  202. func (az *azureRemoteStorageClient) DeleteBucket(name string) (err error) {
  203. containerURL := az.serviceURL.NewContainerURL(name)
  204. if _, err = containerURL.Delete(context.Background(), azblob.ContainerAccessConditions{}); err != nil {
  205. return fmt.Errorf("delete bucket %s: %v", name, err)
  206. }
  207. return
  208. }