123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- package weed_server
- import (
- "context"
- "fmt"
- "io"
- "math"
- "os"
- "path"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage"
- "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- /*
- Steps to apply erasure coding to .dat .idx files
- 0. ensure the volume is readonly
- 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
- 2. client ask master for possible servers to hold the ec files
- 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
- 4. target servers report the new ec files to the master
- 5. master stores vid -> [14]*DataNode
- 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
- */
- // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
- func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
- glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
- v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
- if v == nil {
- return nil, fmt.Errorf("volume %d not found", req.VolumeId)
- }
- baseFileName := v.DataFileName()
- if v.Collection != req.Collection {
- return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
- }
- shouldCleanup := true
- defer func() {
- if !shouldCleanup {
- return
- }
- for i := 0; i < erasure_coding.TotalShardsCount; i++ {
- os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
- }
- os.Remove(v.IndexFileName() + ".ecx")
- }()
- // write .ec00 ~ .ec13 files
- if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
- }
- // write .ecx file
- if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
- return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
- }
- // write .vif files
- var destroyTime uint64
- if v.Ttl != nil {
- ttlMills := v.Ttl.ToSeconds()
- if ttlMills > 0 {
- destroyTime = uint64(time.Now().Unix()) + v.Ttl.ToSeconds() //calculated destroy time from the ec volume was created
- }
- }
- volumeInfo := &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}
- if destroyTime == 0 {
- glog.Warningf("gen ec volume,cal ec volume destory time fail,set time to 0,ttl:%v", v.Ttl)
- } else {
- volumeInfo.DestroyTime = destroyTime
- }
- datSize, _, _ := v.FileStat()
- volumeInfo.DatFileSize = int64(datSize)
- if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil {
- return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err)
- }
- shouldCleanup = false
- return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
- }
- // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
- func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
- glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
- baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
- var rebuiltShardIds []uint32
- for _, location := range vs.store.Locations {
- _, _, existingShardCount, err := checkEcVolumeStatus(baseFileName, location)
- if err != nil {
- return nil, err
- }
- if existingShardCount == 0 {
- continue
- }
- if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
- // write .ec00 ~ .ec13 files
- dataBaseFileName := path.Join(location.Directory, baseFileName)
- if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
- } else {
- rebuiltShardIds = generatedShardIds
- }
- indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
- if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
- }
- break
- }
- }
- return &volume_server_pb.VolumeEcShardsRebuildResponse{
- RebuiltShardIds: rebuiltShardIds,
- }, nil
- }
- // VolumeEcShardsCopy copy the .ecx and some ec data slices
- func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
- glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
- var location *storage.DiskLocation
- if req.CopyEcxFile {
- location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
- return location.DiskType == types.HardDriveType
- })
- } else {
- location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
- _, found := location.FindEcVolume(needle.VolumeId(req.VolumeId))
- return found
- })
- }
- if location == nil {
- return nil, fmt.Errorf("no space left")
- }
- dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
- indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
- err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- // copy ec data slices
- for _, shardId := range req.ShardIds {
- if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
- return err
- }
- }
- if req.CopyEcxFile {
- // copy ecx file
- if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
- return err
- }
- }
- if req.CopyEcjFile {
- // copy ecj file
- if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
- return err
- }
- }
- if req.CopyVifFile {
- // copy vif file
- if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
- return err
- }
- }
- return nil
- })
- if err != nil {
- return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
- }
- return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
- }
- // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
- // the shard should not be mounted before calling this.
- func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
- bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
- glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
- for _, location := range vs.store.Locations {
- if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
- glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err)
- return nil, err
- }
- }
- return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
- }
- func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
- found := false
- indexBaseFilename := path.Join(location.IdxDirectory, bName)
- dataBaseFilename := path.Join(location.Directory, bName)
- if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
- for _, shardId := range shardIds {
- shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
- if util.FileExists(shardFileName) {
- found = true
- os.Remove(shardFileName)
- }
- }
- }
- if !found {
- return nil
- }
- hasEcxFile, hasIdxFile, existingShardCount, err := checkEcVolumeStatus(bName, location)
- if err != nil {
- return err
- }
- if hasEcxFile && existingShardCount == 0 {
- if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
- return err
- }
- os.Remove(indexBaseFilename + ".ecj")
- if !hasIdxFile {
- // .vif is used for ec volumes and normal volumes
- os.Remove(dataBaseFilename + ".vif")
- }
- }
- return nil
- }
- func checkEcVolumeStatus(bName string, location *storage.DiskLocation) (hasEcxFile bool, hasIdxFile bool, existingShardCount int, err error) {
- // check whether to delete the .ecx and .ecj file also
- fileInfos, err := os.ReadDir(location.Directory)
- if err != nil {
- return false, false, 0, err
- }
- if location.IdxDirectory != location.Directory {
- idxFileInfos, err := os.ReadDir(location.IdxDirectory)
- if err != nil {
- return false, false, 0, err
- }
- fileInfos = append(fileInfos, idxFileInfos...)
- }
- for _, fileInfo := range fileInfos {
- if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
- hasEcxFile = true
- continue
- }
- if fileInfo.Name() == bName+".idx" {
- hasIdxFile = true
- continue
- }
- if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
- existingShardCount++
- }
- }
- return hasEcxFile, hasIdxFile, existingShardCount, nil
- }
- func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
- glog.V(0).Infof("VolumeEcShardsMount: %v", req)
- for _, shardId := range req.ShardIds {
- err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
- if err != nil {
- glog.Errorf("ec shard mount %v: %v", req, err)
- } else {
- glog.V(2).Infof("ec shard mount %v", req)
- }
- if err != nil {
- return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
- }
- }
- return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
- }
- func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
- glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
- for _, shardId := range req.ShardIds {
- err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
- if err != nil {
- glog.Errorf("ec shard unmount %v: %v", req, err)
- } else {
- glog.V(2).Infof("ec shard unmount %v", req)
- }
- if err != nil {
- return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
- }
- }
- return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
- }
- func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
- ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
- if !found {
- return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
- }
- ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
- if !found {
- return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
- }
- if req.FileKey != 0 {
- _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
- if size.IsDeleted() {
- return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
- IsDeleted: true,
- })
- }
- }
- bufSize := req.Size
- if bufSize > BufferSizeLimit {
- bufSize = BufferSizeLimit
- }
- buffer := make([]byte, bufSize)
- startOffset, bytesToRead := req.Offset, req.Size
- for bytesToRead > 0 {
- // min of bytesToRead and bufSize
- bufferSize := bufSize
- if bufferSize > bytesToRead {
- bufferSize = bytesToRead
- }
- bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
- // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
- if bytesread > 0 {
- if int64(bytesread) > bytesToRead {
- bytesread = int(bytesToRead)
- }
- err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
- Data: buffer[:bytesread],
- })
- if err != nil {
- // println("sending", bytesread, "bytes err", err.Error())
- return err
- }
- startOffset += int64(bytesread)
- bytesToRead -= int64(bytesread)
- }
- if err != nil {
- if err != io.EOF {
- return err
- }
- return nil
- }
- }
- return nil
- }
- func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
- glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
- resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
- for _, location := range vs.store.Locations {
- if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
- _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
- if err != nil {
- return nil, fmt.Errorf("locate in local ec volume: %v", err)
- }
- if size.IsDeleted() {
- return resp, nil
- }
- err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
- if err != nil {
- return nil, err
- }
- break
- }
- }
- return resp, nil
- }
- // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
- func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
- glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
- // collect .ec00 ~ .ec09 files
- shardFileNames := make([]string, erasure_coding.DataShardsCount)
- v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
- if !found {
- return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
- }
- if v.Collection != req.Collection {
- return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
- }
- for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
- if shardFileNames[shardId] == "" {
- return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId)
- }
- }
- dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
- // calculate .dat file size
- datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
- if err != nil {
- return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
- }
- // write .dat file from .ec00 ~ .ec09 files
- if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize, shardFileNames); err != nil {
- return nil, fmt.Errorf("WriteDatFile %s: %v", dataBaseFileName, err)
- }
- // write .idx file from .ecx and .ecj files
- if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
- return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
- }
- return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
- }
|