replication_util.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package repl_util
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. )
  8. func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.FilerSource, writeFunc func(data []byte) error) error {
  9. for _, chunk := range chunkViews {
  10. fileUrls, err := filerSource.LookupFileId(chunk.FileId)
  11. if err != nil {
  12. return err
  13. }
  14. var writeErr error
  15. var shouldRetry bool
  16. for _, fileUrl := range fileUrls {
  17. shouldRetry, err = util.ReadUrlAsStream(fileUrl, chunk.CipherKey, chunk.IsGzipped, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) {
  18. writeErr = writeFunc(data)
  19. })
  20. if err != nil {
  21. glog.V(1).Infof("read from %s: %v", fileUrl, err)
  22. } else if writeErr != nil {
  23. glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
  24. } else {
  25. break
  26. }
  27. }
  28. if shouldRetry && err != nil {
  29. return err
  30. }
  31. if writeErr != nil {
  32. return writeErr
  33. }
  34. }
  35. return nil
  36. }