b2_sink.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package B2Sink
  2. import (
  3. "context"
  4. "strings"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  8. "github.com/chrislusf/seaweedfs/weed/replication/source"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/kurin/blazer/b2"
  11. )
  12. type B2Sink struct {
  13. client *b2.Client
  14. bucket string
  15. dir string
  16. filerSource *source.FilerSource
  17. }
  18. func init() {
  19. sink.Sinks = append(sink.Sinks, &B2Sink{})
  20. }
  21. func (g *B2Sink) GetName() string {
  22. return "backblaze"
  23. }
  24. func (g *B2Sink) GetSinkToDirectory() string {
  25. return g.dir
  26. }
  27. func (g *B2Sink) Initialize(configuration util.Configuration, prefix string) error {
  28. return g.initialize(
  29. configuration.GetString(prefix+"b2_account_id"),
  30. configuration.GetString(prefix+"b2_master_application_key"),
  31. configuration.GetString(prefix+"bucket"),
  32. configuration.GetString(prefix+"directory"),
  33. )
  34. }
  35. func (g *B2Sink) SetSourceFiler(s *source.FilerSource) {
  36. g.filerSource = s
  37. }
  38. func (g *B2Sink) initialize(accountId, accountKey, bucket, dir string) error {
  39. ctx := context.Background()
  40. client, err := b2.NewClient(ctx, accountId, accountKey)
  41. if err != nil {
  42. return err
  43. }
  44. g.client = client
  45. g.bucket = bucket
  46. g.dir = dir
  47. return nil
  48. }
  49. func (g *B2Sink) DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error {
  50. key = cleanKey(key)
  51. if isDirectory {
  52. key = key + "/"
  53. }
  54. bucket, err := g.client.Bucket(ctx, g.bucket)
  55. if err != nil {
  56. return err
  57. }
  58. targetObject := bucket.Object(key)
  59. return targetObject.Delete(ctx)
  60. }
  61. func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error {
  62. key = cleanKey(key)
  63. if entry.IsDirectory {
  64. return nil
  65. }
  66. totalSize := filer2.TotalSize(entry.Chunks)
  67. chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int(totalSize))
  68. bucket, err := g.client.Bucket(ctx, g.bucket)
  69. if err != nil {
  70. return err
  71. }
  72. targetObject := bucket.Object(key)
  73. writer := targetObject.NewWriter(ctx)
  74. for _, chunk := range chunkViews {
  75. fileUrl, err := g.filerSource.LookupFileId(ctx, chunk.FileId)
  76. if err != nil {
  77. return err
  78. }
  79. var writeErr error
  80. _, readErr := util.ReadUrlAsStream(fileUrl, chunk.Offset, int(chunk.Size), func(data []byte) {
  81. _, err := writer.Write(data)
  82. if err != nil {
  83. writeErr = err
  84. }
  85. })
  86. if readErr != nil {
  87. return readErr
  88. }
  89. if writeErr != nil {
  90. return writeErr
  91. }
  92. }
  93. return writer.Close()
  94. }
  95. func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) {
  96. key = cleanKey(key)
  97. // TODO improve efficiency
  98. return false, nil
  99. }
  100. func cleanKey(key string) string {
  101. if strings.HasPrefix(key, "/") {
  102. key = key[1:]
  103. }
  104. return key
  105. }