rclone_backend.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package rclone_backend
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/rclone/rclone/fs/config/configfile"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "io"
  9. "os"
  10. "text/template"
  11. "time"
  12. "github.com/google/uuid"
  13. _ "github.com/rclone/rclone/backend/all"
  14. "github.com/rclone/rclone/fs"
  15. "github.com/rclone/rclone/fs/accounting"
  16. "github.com/rclone/rclone/fs/object"
  17. "github.com/seaweedfs/seaweedfs/weed/glog"
  18. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  19. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  20. )
  21. func init() {
  22. backend.BackendStorageFactories["rclone"] = &RcloneBackendFactory{}
  23. configfile.Install()
  24. }
  25. type RcloneBackendFactory struct {
  26. }
  27. func (factory *RcloneBackendFactory) StorageType() backend.StorageType {
  28. return "rclone"
  29. }
  30. func (factory *RcloneBackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
  31. return newRcloneBackendStorage(configuration, configPrefix, id)
  32. }
  33. type RcloneBackendStorage struct {
  34. id string
  35. remoteName string
  36. keyTemplate *template.Template
  37. keyTemplateText string
  38. fs fs.Fs
  39. }
  40. func newRcloneBackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *RcloneBackendStorage, err error) {
  41. s = &RcloneBackendStorage{}
  42. s.id = id
  43. s.remoteName = configuration.GetString(configPrefix + "remote_name")
  44. s.keyTemplateText = configuration.GetString(configPrefix + "key_template")
  45. s.keyTemplate, err = template.New("keyTemplate").Parse(s.keyTemplateText)
  46. if err != nil {
  47. return
  48. }
  49. ctx := context.TODO()
  50. accounting.Start(ctx)
  51. fsPath := fmt.Sprintf("%s:", s.remoteName)
  52. s.fs, err = fs.NewFs(ctx, fsPath)
  53. if err != nil {
  54. glog.Errorf("failed to instantiate Rclone filesystem: %s", err)
  55. return
  56. }
  57. glog.V(0).Infof("created backend storage rclone.%s for remote name %s", s.id, s.remoteName)
  58. return
  59. }
  60. func (s *RcloneBackendStorage) ToProperties() map[string]string {
  61. m := make(map[string]string)
  62. m["remote_name"] = s.remoteName
  63. if len(s.keyTemplateText) > 0 {
  64. m["key_template"] = s.keyTemplateText
  65. }
  66. return m
  67. }
  68. func formatKey(key string, storage RcloneBackendStorage) (fKey string, err error) {
  69. var b bytes.Buffer
  70. if len(storage.keyTemplateText) == 0 {
  71. fKey = key
  72. } else {
  73. err = storage.keyTemplate.Execute(&b, key)
  74. if err == nil {
  75. fKey = b.String()
  76. }
  77. }
  78. return
  79. }
  80. func (s *RcloneBackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
  81. f := &RcloneBackendStorageFile{
  82. backendStorage: s,
  83. key: key,
  84. tierInfo: tierInfo,
  85. }
  86. return f
  87. }
  88. func (s *RcloneBackendStorage) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
  89. randomUuid, err := uuid.NewRandom()
  90. if err != nil {
  91. return key, 0, err
  92. }
  93. key = randomUuid.String()
  94. key, err = formatKey(key, *s)
  95. if err != nil {
  96. return key, 0, err
  97. }
  98. glog.V(1).Infof("copy dat file of %s to remote rclone.%s as %s", f.Name(), s.id, key)
  99. util.Retry("upload via Rclone", func() error {
  100. size, err = uploadViaRclone(s.fs, f.Name(), key, fn)
  101. return err
  102. })
  103. return
  104. }
  105. func uploadViaRclone(rfs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  106. ctx := context.TODO()
  107. file, err := os.Open(filename)
  108. defer func(file *os.File) {
  109. err := file.Close()
  110. if err != nil {
  111. return
  112. }
  113. }(file)
  114. if err != nil {
  115. return 0, err
  116. }
  117. stat, err := file.Stat()
  118. if err != nil {
  119. return 0, err
  120. }
  121. info := object.NewStaticObjectInfo(key, stat.ModTime(), stat.Size(), true, nil, rfs)
  122. tr := accounting.NewStats(ctx).NewTransfer(info)
  123. defer tr.Done(ctx, err)
  124. acc := tr.Account(ctx, file)
  125. pr := ProgressReader{acc: acc, tr: tr, fn: fn}
  126. obj, err := rfs.Put(ctx, &pr, info)
  127. if err != nil {
  128. return 0, err
  129. }
  130. return obj.Size(), err
  131. }
  132. func (s *RcloneBackendStorage) DownloadFile(filename string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
  133. glog.V(1).Infof("download dat file of %s from remote rclone.%s as %s", filename, s.id, key)
  134. util.Retry("download via Rclone", func() error {
  135. size, err = downloadViaRclone(s.fs, filename, key, fn)
  136. return err
  137. })
  138. return
  139. }
  140. func downloadViaRclone(fs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  141. ctx := context.TODO()
  142. obj, err := fs.NewObject(ctx, key)
  143. if err != nil {
  144. return 0, err
  145. }
  146. rc, err := obj.Open(ctx)
  147. defer func(rc io.ReadCloser) {
  148. err := rc.Close()
  149. if err != nil {
  150. return
  151. }
  152. }(rc)
  153. if err != nil {
  154. return 0, err
  155. }
  156. file, err := os.Create(filename)
  157. defer func(file *os.File) {
  158. err := file.Close()
  159. if err != nil {
  160. return
  161. }
  162. }(file)
  163. tr := accounting.NewStats(ctx).NewTransfer(obj)
  164. defer tr.Done(ctx, err)
  165. acc := tr.Account(ctx, rc)
  166. pr := ProgressReader{acc: acc, tr: tr, fn: fn}
  167. written, err := io.Copy(file, &pr)
  168. if err != nil {
  169. return 0, err
  170. }
  171. return written, nil
  172. }
  173. func (s *RcloneBackendStorage) DeleteFile(key string) (err error) {
  174. glog.V(1).Infof("delete dat file %s from remote", key)
  175. util.Retry("delete via Rclone", func() error {
  176. err = deleteViaRclone(s.fs, key)
  177. return err
  178. })
  179. return
  180. }
  181. func deleteViaRclone(fs fs.Fs, key string) (err error) {
  182. ctx := context.TODO()
  183. obj, err := fs.NewObject(ctx, key)
  184. if err != nil {
  185. return err
  186. }
  187. return obj.Remove(ctx)
  188. }
  189. type RcloneBackendStorageFile struct {
  190. backendStorage *RcloneBackendStorage
  191. key string
  192. tierInfo *volume_server_pb.VolumeInfo
  193. }
  194. func (rcloneBackendStorageFile RcloneBackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
  195. ctx := context.TODO()
  196. obj, err := rcloneBackendStorageFile.backendStorage.fs.NewObject(ctx, rcloneBackendStorageFile.key)
  197. if err != nil {
  198. return 0, err
  199. }
  200. opt := fs.RangeOption{Start: off, End: off + int64(len(p)) - 1}
  201. rc, err := obj.Open(ctx, &opt)
  202. defer func(rc io.ReadCloser) {
  203. err := rc.Close()
  204. if err != nil {
  205. return
  206. }
  207. }(rc)
  208. if err != nil {
  209. return 0, err
  210. }
  211. return io.ReadFull(rc, p)
  212. }
  213. func (rcloneBackendStorageFile RcloneBackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
  214. panic("not implemented")
  215. }
  216. func (rcloneBackendStorageFile RcloneBackendStorageFile) Truncate(off int64) error {
  217. panic("not implemented")
  218. }
  219. func (rcloneBackendStorageFile RcloneBackendStorageFile) Close() error {
  220. return nil
  221. }
  222. func (rcloneBackendStorageFile RcloneBackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
  223. files := rcloneBackendStorageFile.tierInfo.GetFiles()
  224. if len(files) == 0 {
  225. err = fmt.Errorf("remote file info not found")
  226. return
  227. }
  228. datSize = int64(files[0].FileSize)
  229. modTime = time.Unix(int64(files[0].ModifiedTime), 0)
  230. return
  231. }
  232. func (rcloneBackendStorageFile RcloneBackendStorageFile) Name() string {
  233. return rcloneBackendStorageFile.key
  234. }
  235. func (rcloneBackendStorageFile RcloneBackendStorageFile) Sync() error {
  236. return nil
  237. }