rclone_backend.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. //go:build rclone
  2. // +build rclone
  3. package rclone_backend
  4. import (
  5. "bytes"
  6. "context"
  7. "fmt"
  8. "github.com/rclone/rclone/fs/config/configfile"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "io"
  11. "os"
  12. "text/template"
  13. "time"
  14. "github.com/google/uuid"
  15. _ "github.com/rclone/rclone/backend/all"
  16. "github.com/rclone/rclone/fs"
  17. "github.com/rclone/rclone/fs/accounting"
  18. "github.com/rclone/rclone/fs/object"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  21. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  22. )
  23. func init() {
  24. backend.BackendStorageFactories["rclone"] = &RcloneBackendFactory{}
  25. configfile.Install()
  26. }
  27. type RcloneBackendFactory struct {
  28. }
  29. func (factory *RcloneBackendFactory) StorageType() backend.StorageType {
  30. return "rclone"
  31. }
  32. func (factory *RcloneBackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
  33. return newRcloneBackendStorage(configuration, configPrefix, id)
  34. }
  35. type RcloneBackendStorage struct {
  36. id string
  37. remoteName string
  38. keyTemplate *template.Template
  39. keyTemplateText string
  40. fs fs.Fs
  41. }
  42. func newRcloneBackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *RcloneBackendStorage, err error) {
  43. s = &RcloneBackendStorage{}
  44. s.id = id
  45. s.remoteName = configuration.GetString(configPrefix + "remote_name")
  46. s.keyTemplateText = configuration.GetString(configPrefix + "key_template")
  47. s.keyTemplate, err = template.New("keyTemplate").Parse(s.keyTemplateText)
  48. if err != nil {
  49. return
  50. }
  51. ctx := context.TODO()
  52. accounting.Start(ctx)
  53. fsPath := fmt.Sprintf("%s:", s.remoteName)
  54. s.fs, err = fs.NewFs(ctx, fsPath)
  55. if err != nil {
  56. glog.Errorf("failed to instantiate Rclone filesystem: %s", err)
  57. return
  58. }
  59. glog.V(0).Infof("created backend storage rclone.%s for remote name %s", s.id, s.remoteName)
  60. return
  61. }
  62. func (s *RcloneBackendStorage) ToProperties() map[string]string {
  63. m := make(map[string]string)
  64. m["remote_name"] = s.remoteName
  65. if len(s.keyTemplateText) > 0 {
  66. m["key_template"] = s.keyTemplateText
  67. }
  68. return m
  69. }
  70. func formatKey(key string, storage RcloneBackendStorage) (fKey string, err error) {
  71. var b bytes.Buffer
  72. if len(storage.keyTemplateText) == 0 {
  73. fKey = key
  74. } else {
  75. err = storage.keyTemplate.Execute(&b, key)
  76. if err == nil {
  77. fKey = b.String()
  78. }
  79. }
  80. return
  81. }
  82. func (s *RcloneBackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
  83. f := &RcloneBackendStorageFile{
  84. backendStorage: s,
  85. key: key,
  86. tierInfo: tierInfo,
  87. }
  88. return f
  89. }
  90. func (s *RcloneBackendStorage) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
  91. randomUuid, err := uuid.NewRandom()
  92. if err != nil {
  93. return key, 0, err
  94. }
  95. key = randomUuid.String()
  96. key, err = formatKey(key, *s)
  97. if err != nil {
  98. return key, 0, err
  99. }
  100. glog.V(1).Infof("copy dat file of %s to remote rclone.%s as %s", f.Name(), s.id, key)
  101. util.Retry("upload via Rclone", func() error {
  102. size, err = uploadViaRclone(s.fs, f.Name(), key, fn)
  103. return err
  104. })
  105. return
  106. }
  107. func uploadViaRclone(rfs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  108. ctx := context.TODO()
  109. file, err := os.Open(filename)
  110. defer func(file *os.File) {
  111. err := file.Close()
  112. if err != nil {
  113. return
  114. }
  115. }(file)
  116. if err != nil {
  117. return 0, err
  118. }
  119. stat, err := file.Stat()
  120. if err != nil {
  121. return 0, err
  122. }
  123. info := object.NewStaticObjectInfo(key, stat.ModTime(), stat.Size(), true, nil, rfs)
  124. tr := accounting.NewStats(ctx).NewTransfer(info, rfs)
  125. defer tr.Done(ctx, err)
  126. acc := tr.Account(ctx, file)
  127. pr := ProgressReader{acc: acc, tr: tr, fn: fn}
  128. obj, err := rfs.Put(ctx, &pr, info)
  129. if err != nil {
  130. return 0, err
  131. }
  132. return obj.Size(), err
  133. }
  134. func (s *RcloneBackendStorage) DownloadFile(filename string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
  135. glog.V(1).Infof("download dat file of %s from remote rclone.%s as %s", filename, s.id, key)
  136. util.Retry("download via Rclone", func() error {
  137. size, err = downloadViaRclone(s.fs, filename, key, fn)
  138. return err
  139. })
  140. return
  141. }
  142. func downloadViaRclone(fs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  143. ctx := context.TODO()
  144. obj, err := fs.NewObject(ctx, key)
  145. if err != nil {
  146. return 0, err
  147. }
  148. rc, err := obj.Open(ctx)
  149. defer func(rc io.ReadCloser) {
  150. err := rc.Close()
  151. if err != nil {
  152. return
  153. }
  154. }(rc)
  155. if err != nil {
  156. return 0, err
  157. }
  158. file, err := os.Create(filename)
  159. defer func(file *os.File) {
  160. err := file.Close()
  161. if err != nil {
  162. return
  163. }
  164. }(file)
  165. tr := accounting.NewStats(ctx).NewTransfer(obj, fs)
  166. defer tr.Done(ctx, err)
  167. acc := tr.Account(ctx, rc)
  168. pr := ProgressReader{acc: acc, tr: tr, fn: fn}
  169. written, err := io.Copy(file, &pr)
  170. if err != nil {
  171. return 0, err
  172. }
  173. return written, nil
  174. }
  175. func (s *RcloneBackendStorage) DeleteFile(key string) (err error) {
  176. glog.V(1).Infof("delete dat file %s from remote", key)
  177. util.Retry("delete via Rclone", func() error {
  178. err = deleteViaRclone(s.fs, key)
  179. return err
  180. })
  181. return
  182. }
  183. func deleteViaRclone(fs fs.Fs, key string) (err error) {
  184. ctx := context.TODO()
  185. obj, err := fs.NewObject(ctx, key)
  186. if err != nil {
  187. return err
  188. }
  189. return obj.Remove(ctx)
  190. }
  191. type RcloneBackendStorageFile struct {
  192. backendStorage *RcloneBackendStorage
  193. key string
  194. tierInfo *volume_server_pb.VolumeInfo
  195. }
  196. func (rcloneBackendStorageFile RcloneBackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
  197. ctx := context.TODO()
  198. obj, err := rcloneBackendStorageFile.backendStorage.fs.NewObject(ctx, rcloneBackendStorageFile.key)
  199. if err != nil {
  200. return 0, err
  201. }
  202. opt := fs.RangeOption{Start: off, End: off + int64(len(p)) - 1}
  203. rc, err := obj.Open(ctx, &opt)
  204. defer func(rc io.ReadCloser) {
  205. err := rc.Close()
  206. if err != nil {
  207. return
  208. }
  209. }(rc)
  210. if err != nil {
  211. return 0, err
  212. }
  213. return io.ReadFull(rc, p)
  214. }
  215. func (rcloneBackendStorageFile RcloneBackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
  216. panic("not implemented")
  217. }
  218. func (rcloneBackendStorageFile RcloneBackendStorageFile) Truncate(off int64) error {
  219. panic("not implemented")
  220. }
  221. func (rcloneBackendStorageFile RcloneBackendStorageFile) Close() error {
  222. return nil
  223. }
  224. func (rcloneBackendStorageFile RcloneBackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
  225. files := rcloneBackendStorageFile.tierInfo.GetFiles()
  226. if len(files) == 0 {
  227. err = fmt.Errorf("remote file info not found")
  228. return
  229. }
  230. datSize = int64(files[0].FileSize)
  231. modTime = time.Unix(int64(files[0].ModifiedTime), 0)
  232. return
  233. }
  234. func (rcloneBackendStorageFile RcloneBackendStorageFile) Name() string {
  235. return rcloneBackendStorageFile.key
  236. }
  237. func (rcloneBackendStorageFile RcloneBackendStorageFile) Sync() error {
  238. return nil
  239. }