replication_util.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package repl_util
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. )
  8. func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
  9. for x := chunkViews.Front(); x != nil; x = x.Next {
  10. chunk := x.Value
  11. fileUrls, err := filerSource.LookupFileId(chunk.FileId)
  12. if err != nil {
  13. return err
  14. }
  15. var writeErr error
  16. var shouldRetry bool
  17. for _, fileUrl := range fileUrls {
  18. shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.OffsetInChunk, int(chunk.ViewSize), func(data []byte) {
  19. writeErr = writeFunc(data)
  20. })
  21. if err != nil {
  22. glog.V(1).Infof("read from %s: %v", fileUrl, err)
  23. } else if writeErr != nil {
  24. glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
  25. } else {
  26. break
  27. }
  28. }
  29. if shouldRetry && err != nil {
  30. return err
  31. }
  32. if writeErr != nil {
  33. return writeErr
  34. }
  35. }
  36. return nil
  37. }