gcs_storage_client.go 6.5 KB


  1. package gcs
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "reflect"
  8. "cloud.google.com/go/storage"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
  12. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "google.golang.org/api/iterator"
  15. "google.golang.org/api/option"
  16. )
  17. func init() {
  18. remote_storage.RemoteStorageClientMakers["gcs"] = new(gcsRemoteStorageMaker)
  19. }
  20. type gcsRemoteStorageMaker struct{}
  21. func (s gcsRemoteStorageMaker) HasBucket() bool {
  22. return true
  23. }
  24. func (s gcsRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  25. client := &gcsRemoteStorageClient{
  26. conf: conf,
  27. }
  28. googleApplicationCredentials := conf.GcsGoogleApplicationCredentials
  29. if googleApplicationCredentials == "" {
  30. found := false
  31. googleApplicationCredentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
  32. if !found {
  33. return nil, fmt.Errorf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable")
  34. }
  35. }
  36. projectID := conf.GcsProjectId
  37. if projectID == "" {
  38. found := false
  39. projectID, found = os.LookupEnv("GOOGLE_CLOUD_PROJECT")
  40. if !found {
  41. glog.Warningf("need to specific GOOGLE_CLOUD_PROJECT env variable")
  42. }
  43. }
  44. googleApplicationCredentials = util.ResolvePath(googleApplicationCredentials)
  45. c, err := storage.NewClient(context.Background(), option.WithCredentialsFile(googleApplicationCredentials))
  46. if err != nil {
  47. return nil, fmt.Errorf("failed to create client: %v", err)
  48. }
  49. client.client = c
  50. client.projectID = projectID
  51. return client, nil
  52. }
  53. type gcsRemoteStorageClient struct {
  54. conf *remote_pb.RemoteConf
  55. client *storage.Client
  56. projectID string
  57. }
  58. var _ = remote_storage.RemoteStorageClient(&gcsRemoteStorageClient{})
  59. func (gcs *gcsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  60. pathKey := loc.Path[1:]
  61. objectIterator := gcs.client.Bucket(loc.Bucket).Objects(context.Background(), &storage.Query{
  62. Delimiter: "",
  63. Prefix: pathKey,
  64. Versions: false,
  65. })
  66. var objectAttr *storage.ObjectAttrs
  67. for err == nil {
  68. objectAttr, err = objectIterator.Next()
  69. if err != nil {
  70. if err == iterator.Done {
  71. return nil
  72. }
  73. return err
  74. }
  75. key := objectAttr.Name
  76. key = "/" + key
  77. dir, name := util.FullPath(key).DirAndName()
  78. err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
  79. RemoteMtime: objectAttr.Updated.Unix(),
  80. RemoteSize: objectAttr.Size,
  81. RemoteETag: objectAttr.Etag,
  82. StorageName: gcs.conf.Name,
  83. })
  84. }
  85. return
  86. }
  87. func (gcs *gcsRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
  88. key := loc.Path[1:]
  89. rangeReader, readErr := gcs.client.Bucket(loc.Bucket).Object(key).NewRangeReader(context.Background(), offset, size)
  90. if readErr != nil {
  91. return nil, readErr
  92. }
  93. data, err = io.ReadAll(rangeReader)
  94. if err != nil {
  95. return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
  96. }
  97. return
  98. }
  99. func (gcs *gcsRemoteStorageClient) WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
  100. return nil
  101. }
  102. func (gcs *gcsRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorageLocation) (err error) {
  103. return nil
  104. }
  105. func (gcs *gcsRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
  106. key := loc.Path[1:]
  107. metadata := toMetadata(entry.Extended)
  108. wc := gcs.client.Bucket(loc.Bucket).Object(key).NewWriter(context.Background())
  109. wc.Metadata = metadata
  110. if _, err = io.Copy(wc, reader); err != nil {
  111. return nil, fmt.Errorf("upload to gcs %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
  112. }
  113. if err = wc.Close(); err != nil {
  114. return nil, fmt.Errorf("close gcs %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
  115. }
  116. // read back the remote entry
  117. return gcs.readFileRemoteEntry(loc)
  118. }
  119. func (gcs *gcsRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
  120. key := loc.Path[1:]
  121. attr, err := gcs.client.Bucket(loc.Bucket).Object(key).Attrs(context.Background())
  122. if err != nil {
  123. return nil, err
  124. }
  125. return &filer_pb.RemoteEntry{
  126. RemoteMtime: attr.Updated.Unix(),
  127. RemoteSize: attr.Size,
  128. RemoteETag: attr.Etag,
  129. StorageName: gcs.conf.Name,
  130. }, nil
  131. }
  132. func toMetadata(attributes map[string][]byte) map[string]string {
  133. metadata := make(map[string]string)
  134. for k, v := range attributes {
  135. metadata[k] = string(v)
  136. }
  137. return metadata
  138. }
  139. func (gcs *gcsRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
  140. if reflect.DeepEqual(oldEntry.Extended, newEntry.Extended) {
  141. return nil
  142. }
  143. metadata := toMetadata(newEntry.Extended)
  144. key := loc.Path[1:]
  145. if len(metadata) > 0 {
  146. _, err = gcs.client.Bucket(loc.Bucket).Object(key).Update(context.Background(), storage.ObjectAttrsToUpdate{
  147. Metadata: metadata,
  148. })
  149. } else {
  150. // no way to delete the metadata yet
  151. }
  152. return
  153. }
  154. func (gcs *gcsRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
  155. key := loc.Path[1:]
  156. if err = gcs.client.Bucket(loc.Bucket).Object(key).Delete(context.Background()); err != nil {
  157. return fmt.Errorf("gcs delete %s%s: %v", loc.Bucket, key, err)
  158. }
  159. return
  160. }
  161. func (gcs *gcsRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
  162. if gcs.projectID == "" {
  163. return nil, fmt.Errorf("gcs project id or GOOGLE_CLOUD_PROJECT env variable not set")
  164. }
  165. iter := gcs.client.Buckets(context.Background(), gcs.projectID)
  166. for {
  167. b, err := iter.Next()
  168. if err == iterator.Done {
  169. break
  170. }
  171. if err != nil {
  172. return buckets, err
  173. }
  174. buckets = append(buckets, &remote_storage.Bucket{
  175. Name: b.Name,
  176. CreatedAt: b.Created,
  177. })
  178. }
  179. return
  180. }
  181. func (gcs *gcsRemoteStorageClient) CreateBucket(name string) (err error) {
  182. if gcs.projectID == "" {
  183. return fmt.Errorf("gcs project id or GOOGLE_CLOUD_PROJECT env variable not set")
  184. }
  185. err = gcs.client.Bucket(name).Create(context.Background(), gcs.projectID, &storage.BucketAttrs{})
  186. if err != nil {
  187. return fmt.Errorf("create bucket %s: %v", name, err)
  188. }
  189. return
  190. }
  191. func (gcs *gcsRemoteStorageClient) DeleteBucket(name string) (err error) {
  192. err = gcs.client.Bucket(name).Delete(context.Background())
  193. if err != nil {
  194. return fmt.Errorf("delete bucket %s: %v", name, err)
  195. }
  196. return
  197. }