gcs_storage_client.go 6.5 KB


  1. package gcs
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "reflect"
  8. "strings"
  9. "cloud.google.com/go/storage"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/remote_storage"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. "google.golang.org/api/iterator"
  16. "google.golang.org/api/option"
  17. )
  18. func init() {
  19. remote_storage.RemoteStorageClientMakers["gcs"] = new(gcsRemoteStorageMaker)
  20. }
  21. type gcsRemoteStorageMaker struct{}
  22. func (s gcsRemoteStorageMaker) HasBucket() bool {
  23. return true
  24. }
  25. func (s gcsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  26. client := &gcsRemoteStorageClient{
  27. conf: conf,
  28. }
  29. googleApplicationCredentials := conf.GcsGoogleApplicationCredentials
  30. if googleApplicationCredentials == "" {
  31. found := false
  32. googleApplicationCredentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
  33. if !found {
  34. return nil, fmt.Errorf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable")
  35. }
  36. }
  37. projectID := conf.GcsProjectId
  38. if projectID == "" {
  39. found := false
  40. projectID, found = os.LookupEnv("GOOGLE_CLOUD_PROJECT")
  41. if !found {
  42. glog.Warningf("need to specific GOOGLE_CLOUD_PROJECT env variable")
  43. }
  44. }
  45. googleApplicationCredentials = util.ResolvePath(googleApplicationCredentials)
  46. c, err := storage.NewClient(context.Background(), option.WithCredentialsFile(googleApplicationCredentials))
  47. if err != nil {
  48. return nil, fmt.Errorf("failed to create client: %v", err)
  49. }
  50. client.client = c
  51. client.projectID = projectID
  52. return client, nil
  53. }
  54. type gcsRemoteStorageClient struct {
  55. conf *remote_pb.RemoteConf
  56. client *storage.Client
  57. projectID string
  58. }
  59. var _ = remote_storage.RemoteStorageClient(&gcsRemoteStorageClient{})
  60. func (gcs *gcsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  61. pathKey := loc.Path[1:]
  62. objectIterator := gcs.client.Bucket(loc.Bucket).Objects(context.Background(), &storage.Query{
  63. Delimiter: "",
  64. Prefix: pathKey,
  65. Versions: false,
  66. })
  67. var objectAttr *storage.ObjectAttrs
  68. for err == nil {
  69. objectAttr, err = objectIterator.Next()
  70. if err != nil {
  71. if err == iterator.Done {
  72. return nil
  73. }
  74. return err
  75. }
  76. key := objectAttr.Name
  77. key = "/" + key
  78. dir, name := util.FullPath(key).DirAndName()
  79. err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
  80. RemoteMtime: objectAttr.Updated.Unix(),
  81. RemoteSize: objectAttr.Size,
  82. RemoteETag: objectAttr.Etag,
  83. StorageName: gcs.conf.Name,
  84. })
  85. }
  86. return
  87. }
  88. func (gcs *gcsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
  89. key := loc.Path[1:]
  90. rangeReader, readErr := gcs.client.Bucket(loc.Bucket).Object(key).NewRangeReader(context.Background(), offset, size)
  91. if readErr != nil {
  92. return nil, readErr
  93. }
  94. data, err = io.ReadAll(rangeReader)
  95. if err != nil {
  96. return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
  97. }
  98. return
  99. }
  100. func (gcs *gcsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
  101. return nil
  102. }
  103. func (gcs *gcsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
  104. return nil
  105. }
  106. func (gcs *gcsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
  107. key := loc.Path[1:]
  108. metadata := toMetadata(entry.Extended)
  109. wc := gcs.client.Bucket(loc.Bucket).Object(key).NewWriter(context.Background())
  110. wc.Metadata = metadata
  111. if _, err = io.Copy(wc, reader); err != nil {
  112. return nil, fmt.Errorf("upload to gcs %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
  113. }
  114. if err = wc.Close(); err != nil {
  115. return nil, fmt.Errorf("close gcs %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
  116. }
  117. // read back the remote entry
  118. return gcs.readFileRemoteEntry(loc)
  119. }
  120. func (gcs *gcsRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
  121. key := loc.Path[1:]
  122. attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(context.Background())
  123. if err != nil {
  124. return nil, err
  125. }
  126. return &filer_pb.RemoteEntry{
  127. RemoteMtime: attr.Updated.Unix(),
  128. RemoteSize: attr.Size,
  129. RemoteETag: attr.Etag,
  130. StorageName: gcs.conf.Name,
  131. }, nil
  132. }
  133. func toMetadata(attributes map[string][]byte) map[string]string {
  134. metadata := make(map[string]string)
  135. for k, v := range attributes {
  136. if strings.HasPrefix(k, "X-") {
  137. continue
  138. }
  139. metadata[k] = string(v)
  140. }
  141. return metadata
  142. }
  143. func (gcs *gcsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
  144. if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) {
  145. return nil
  146. }
  147. metadata := toMetadata(newEntry.Extended)
  148. key := loc.Path[1:]
  149. if len(metadata) > 0 {
  150. _, err = gcs.client.Bucket(loc.Bucket).Object(key).Update(context.Background(), storage.ObjectAttrsToUpdate{
  151. Metadata: metadata,
  152. })
  153. } else {
  154. // no way to delete the metadata yet
  155. }
  156. return
  157. }
  158. func (gcs *gcsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
  159. key := loc.Path[1:]
  160. if err = gcs.client.Bucket(loc.Bucket).Object(key).Delete(context.Background()); err != nil {
  161. return fmt.Errorf("gcs delete %s%s: %v", loc.Bucket, key, err)
  162. }
  163. return
  164. }
  165. func (gcs *gcsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
  166. if gcs.projectID == "" {
  167. return nil, fmt.Errorf("gcs project id or GOOGLE_CLOUD_PROJECT env variable not set")
  168. }
  169. iter := gcs.client.Buckets(context.Background(), gcs.projectID)
  170. for {
  171. b, err := iter.Next()
  172. if err == iterator.Done {
  173. break
  174. }
  175. if err != nil {
  176. return buckets, err
  177. }
  178. buckets = append(buckets, &remote_storage.Bucket{
  179. Name: b.Name,
  180. CreatedAt: b.Created,
  181. })
  182. }
  183. return
  184. }
  185. func (gcs *gcsRemoteStorageClient) CreateBucket(name string) (err error) {
  186. if gcs.projectID == "" {
  187. return fmt.Errorf("gcs project id or GOOGLE_CLOUD_PROJECT env variable not set")
  188. }
  189. err = gcs.client.Bucket(name).Create(context.Background(), gcs.projectID, &storage.BucketAttrs{})
  190. if err != nil {
  191. return fmt.Errorf("create bucket %s: %v", name, err)
  192. }
  193. return
  194. }
  195. func (gcs *gcsRemoteStorageClient) DeleteBucket(name string) (err error) {
  196. err = gcs.client.Bucket(name).Delete(context.Background())
  197. if err != nil {
  198. return fmt.Errorf("delete bucket %s: %v", name, err)
  199. }
  200. return
  201. }