123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- package erasure_coding
- import (
- "fmt"
- "io"
- "os"
- "github.com/seaweedfs/seaweedfs/weed/storage/backend"
- "github.com/seaweedfs/seaweedfs/weed/storage/idx"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- // write .idx file from .ecx and .ecj files
- func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
- ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
- if openErr != nil {
- return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
- }
- defer ecxFile.Close()
- idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if openErr != nil {
- return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr)
- }
- defer idxFile.Close()
- io.Copy(idxFile, ecxFile)
- err = iterateEcjFile(baseFileName, func(key types.NeedleId) error {
- bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
- idxFile.Write(bytes)
- return nil
- })
- return err
- }
- // FindDatFileSize calculate .dat file size from max offset entry
- // there may be extra deletions after that entry
- // but they are deletions anyway
- func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
- version, err := readEcVolumeVersion(dataBaseFileName)
- if err != nil {
- return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
- }
- err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
- if size.IsDeleted() {
- return nil
- }
- entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
- if datSize < entryStopOffset {
- datSize = entryStopOffset
- }
- return nil
- })
- return
- }
- func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
- // find volume version
- datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
- if err != nil {
- return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
- }
- datBackend := backend.NewDiskFile(datFile)
- superBlock, err := super_block.ReadSuperBlock(datBackend)
- datBackend.Close()
- if err != nil {
- return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
- }
- return superBlock.Version, nil
- }
- func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
- ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
- if openErr != nil {
- return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
- }
- defer ecxFile.Close()
- buf := make([]byte, types.NeedleMapEntrySize)
- for {
- n, err := ecxFile.Read(buf)
- if n != types.NeedleMapEntrySize {
- if err == io.EOF {
- return nil
- }
- return err
- }
- key, offset, size := idx.IdxFileEntry(buf)
- if processNeedleFn != nil {
- err = processNeedleFn(key, offset, size)
- }
- if err != nil {
- if err != io.EOF {
- return err
- }
- return nil
- }
- }
- }
- func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
- if !util.FileExists(baseFileName + ".ecj") {
- return nil
- }
- ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
- if openErr != nil {
- return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr)
- }
- defer ecjFile.Close()
- buf := make([]byte, types.NeedleIdSize)
- for {
- n, err := ecjFile.Read(buf)
- if n != types.NeedleIdSize {
- if err == io.EOF {
- return nil
- }
- return err
- }
- if processNeedleFn != nil {
- err = processNeedleFn(types.BytesToNeedleId(buf))
- }
- if err != nil {
- if err == io.EOF {
- return nil
- }
- return err
- }
- }
- }
- // WriteDatFile generates .dat from .ec00 ~ .ec09 files
- func WriteDatFile(baseFileName string, datFileSize int64, shardFileNames []string) error {
- datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
- if openErr != nil {
- return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
- }
- defer datFile.Close()
- inputFiles := make([]*os.File, DataShardsCount)
- defer func() {
- for shardId := 0; shardId < DataShardsCount; shardId++ {
- if inputFiles[shardId] != nil {
- inputFiles[shardId].Close()
- }
- }
- }()
- for shardId := 0; shardId < DataShardsCount; shardId++ {
- inputFiles[shardId], openErr = os.OpenFile(shardFileNames[shardId], os.O_RDONLY, 0)
- if openErr != nil {
- return openErr
- }
- }
- for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
- for shardId := 0; shardId < DataShardsCount; shardId++ {
- w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
- if w != ErasureCodingLargeBlockSize {
- return fmt.Errorf("copy %s large block on shardId %d: %v", baseFileName, shardId, err)
- }
- datFileSize -= ErasureCodingLargeBlockSize
- }
- }
- for datFileSize > 0 {
- for shardId := 0; shardId < DataShardsCount; shardId++ {
- toRead := min(datFileSize, ErasureCodingSmallBlockSize)
- w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
- if w != toRead {
- return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
- }
- datFileSize -= toRead
- }
- }
- return nil
- }
- func min(x, y int64) int64 {
- if x > y {
- return y
- }
- return x
- }
|