azure_storage_client.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package azure
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/url"
  7. "os"
  8. "reflect"
  9. "github.com/Azure/azure-storage-blob-go/azblob"
  10. "github.com/chrislusf/seaweedfs/weed/filer"
  11. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  13. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. func init() {
  17. remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker)
  18. }
  19. type azureRemoteStorageMaker struct{}
  20. func (s azureRemoteStorageMaker) HasBucket() bool {
  21. return true
  22. }
  23. func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  24. client := &azureRemoteStorageClient{
  25. conf: conf,
  26. }
  27. accountName, accountKey := conf.AzureAccountName, conf.AzureAccountKey
  28. if len(accountName) == 0 || len(accountKey) == 0 {
  29. accountName, accountKey = os.Getenv("AZURE_STORAGE_ACCOUNT"), os.Getenv("AZURE_STORAGE_ACCESS_KEY")
  30. if len(accountName) == 0 || len(accountKey) == 0 {
  31. return nil, fmt.Errorf("either AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY environment variable is not set")
  32. }
  33. }
  34. // Use your Storage account's name and key to create a credential object.
  35. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
  36. if err != nil {
  37. return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err)
  38. }
  39. // Create a request pipeline that is used to process HTTP(S) requests and responses.
  40. p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
  41. // Create an ServiceURL object that wraps the service URL and a request pipeline.
  42. u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
  43. client.serviceURL = azblob.NewServiceURL(*u, p)
  44. return client, nil
  45. }
  46. type azureRemoteStorageClient struct {
  47. conf *remote_pb.RemoteConf
  48. serviceURL azblob.ServiceURL
  49. }
  50. var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
  51. func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  52. pathKey := loc.Path[1:]
  53. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  54. // List the container that we have created above
  55. for marker := (azblob.Marker{}); marker.NotDone(); {
  56. // Get a result segment starting with the blob indicated by the current Marker.
  57. listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{
  58. Prefix: pathKey,
  59. })
  60. if err != nil {
  61. return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err)
  62. }
  63. // ListBlobs returns the start of the next segment; you MUST use this to get
  64. // the next segment (after processing the current result segment).
  65. marker = listBlob.NextMarker
  66. // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
  67. for _, blobInfo := range listBlob.Segment.BlobItems {
  68. key := blobInfo.Name
  69. key = "/" + key
  70. dir, name := util.FullPath(key).DirAndName()
  71. err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
  72. RemoteMtime: blobInfo.Properties.LastModified.Unix(),
  73. RemoteSize: *blobInfo.Properties.ContentLength,
  74. RemoteETag: string(blobInfo.Properties.Etag),
  75. StorageName: az.conf.Name,
  76. })
  77. if err != nil {
  78. return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err)
  79. }
  80. }
  81. }
  82. return
  83. }
  84. func (az *azureRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
  85. key := loc.Path[1:]
  86. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  87. blobURL := containerURL.NewBlockBlobURL(key)
  88. downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
  89. if readErr != nil {
  90. return nil, readErr
  91. }
  92. // NOTE: automatically retries are performed if the connection fails
  93. bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
  94. defer bodyStream.Close()
  95. data, err = io.ReadAll(bodyStream)
  96. if err != nil {
  97. return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
  98. }
  99. return
  100. }
  101. func (az *azureRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
  102. return nil
  103. }
  104. func (az *azureRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
  105. return nil
  106. }
  107. func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
  108. key := loc.Path[1:]
  109. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  110. blobURL := containerURL.NewBlockBlobURL(key)
  111. readerAt, ok := reader.(io.ReaderAt)
  112. if !ok {
  113. return nil, fmt.Errorf("unexpected reader: readerAt expected")
  114. }
  115. fileSize := int64(filer.FileSize(entry))
  116. _, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{
  117. BlockSize: 4 * 1024 * 1024,
  118. Parallelism: 16})
  119. if err != nil {
  120. return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err)
  121. }
  122. metadata := toMetadata(entry.Extended)
  123. if len(metadata) > 0 {
  124. _, err = blobURL.SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
  125. if err != nil {
  126. return nil, fmt.Errorf("azure set metadata on %s%s: %v", loc.Bucket, loc.Path, err)
  127. }
  128. }
  129. // read back the remote entry
  130. return az.readFileRemoteEntry(loc)
  131. }
  132. func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
  133. key := loc.Path[1:]
  134. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  135. blobURL := containerURL.NewBlockBlobURL(key)
  136. attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
  137. if err != nil {
  138. return nil, err
  139. }
  140. return &filer_pb.RemoteEntry{
  141. RemoteMtime: attr.LastModified().Unix(),
  142. RemoteSize: attr.ContentLength(),
  143. RemoteETag: string(attr.ETag()),
  144. StorageName: az.conf.Name,
  145. }, nil
  146. }
  147. func toMetadata(attributes map[string][]byte) map[string]string {
  148. metadata := make(map[string]string)
  149. for k, v := range attributes {
  150. metadata[k] = string(v)
  151. }
  152. return metadata
  153. }
  154. func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
  155. if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) {
  156. return nil
  157. }
  158. metadata := toMetadata(newEntry.Extended)
  159. key := loc.Path[1:]
  160. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  161. _, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
  162. return
  163. }
  164. func (az *azureRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
  165. key := loc.Path[1:]
  166. containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
  167. if _, err = containerURL.NewBlobURL(key).Delete(context.Background(),
  168. azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
  169. return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err)
  170. }
  171. return
  172. }
  173. func (az *azureRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
  174. ctx := context.Background()
  175. for containerMarker := (azblob.Marker{}); containerMarker.NotDone(); {
  176. listContainer, err := az.serviceURL.ListContainersSegment(ctx, containerMarker, azblob.ListContainersSegmentOptions{})
  177. if err == nil {
  178. for _, v := range listContainer.ContainerItems {
  179. buckets = append(buckets, &remote_storage.Bucket{
  180. Name: v.Name,
  181. CreatedAt: v.Properties.LastModified,
  182. })
  183. }
  184. } else {
  185. return buckets, err
  186. }
  187. containerMarker = listContainer.NextMarker
  188. }
  189. return
  190. }
  191. func (az *azureRemoteStorageClient) CreateBucket(name string) (err error) {
  192. containerURL := az.serviceURL.NewContainerURL(name)
  193. if _, err = containerURL.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessNone); err != nil {
  194. return fmt.Errorf("create bucket %s: %v", name, err)
  195. }
  196. return
  197. }
  198. func (az *azureRemoteStorageClient) DeleteBucket(name string) (err error) {
  199. containerURL := az.serviceURL.NewContainerURL(name)
  200. if _, err = containerURL.Delete(context.Background(), azblob.ContainerAccessConditions{}); err != nil {
  201. return fmt.Errorf("delete bucket %s: %v", name, err)
  202. }
  203. return
  204. }