123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- package erasure_coding
- import (
- "errors"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "math"
- "os"
- "sync"
- "time"
- "golang.org/x/exp/slices"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/idx"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
- )
- var (
- NotFoundError = errors.New("needle not found")
- destroyDelaySeconds int64 = 0
- )
- type EcVolume struct {
- VolumeId needle.VolumeId
- Collection string
- dir string
- dirIdx string
- ecxFile *os.File
- ecxFileSize int64
- ecxCreatedAt time.Time
- Shards []*EcVolumeShard
- ShardLocations map[ShardId][]pb.ServerAddress
- ShardLocationsRefreshTime time.Time
- ShardLocationsLock sync.RWMutex
- Version needle.Version
- ecjFile *os.File
- ecjFileAccessLock sync.Mutex
- diskType types.DiskType
- datFileSize int64
- DestroyTime uint64 //ec volume destroy time, calculated from the ec volume was created
- }
- func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
- ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
- dataBaseFileName := EcShardFileName(collection, dir, int(vid))
- indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
- // open ecx file
- if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
- }
- ecxFi, statErr := ev.ecxFile.Stat()
- if statErr != nil {
- _ = ev.ecxFile.Close()
- return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
- }
- ev.ecxFileSize = ecxFi.Size()
- ev.ecxCreatedAt = ecxFi.ModTime()
- // open ecj file
- if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
- }
- // read volume info
- ev.Version = needle.Version3
- if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
- ev.Version = needle.Version(volumeInfo.Version)
- ev.datFileSize = volumeInfo.DatFileSize
- ev.DestroyTime = volumeInfo.DestroyTime
- } else {
- glog.Warningf("vif file not found,volumeId:%d, filename:%s", vid, dataBaseFileName)
- volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
- }
- ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
- return
- }
- func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
- for _, s := range ev.Shards {
- if s.ShardId == ecVolumeShard.ShardId {
- return false
- }
- }
- ev.Shards = append(ev.Shards, ecVolumeShard)
- slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) int {
- if a.VolumeId != b.VolumeId {
- return int(a.VolumeId - b.VolumeId)
- }
- return int(a.ShardId - b.ShardId)
- })
- return true
- }
- func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
- foundPosition := -1
- for i, s := range ev.Shards {
- if s.ShardId == shardId {
- foundPosition = i
- }
- }
- if foundPosition < 0 {
- return nil, false
- }
- ecVolumeShard = ev.Shards[foundPosition]
- ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
- return ecVolumeShard, true
- }
- func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
- for _, s := range ev.Shards {
- if s.ShardId == shardId {
- return s, true
- }
- }
- return nil, false
- }
- func (ev *EcVolume) Close() {
- for _, s := range ev.Shards {
- s.Close()
- }
- if ev.ecjFile != nil {
- ev.ecjFileAccessLock.Lock()
- _ = ev.ecjFile.Close()
- ev.ecjFile = nil
- ev.ecjFileAccessLock.Unlock()
- }
- if ev.ecxFile != nil {
- _ = ev.ecxFile.Sync()
- _ = ev.ecxFile.Close()
- ev.ecxFile = nil
- }
- }
- func (ev *EcVolume) Destroy() {
- ev.Close()
- for _, s := range ev.Shards {
- s.Destroy()
- }
- os.Remove(ev.FileName(".ecx"))
- os.Remove(ev.FileName(".ecj"))
- os.Remove(ev.FileName(".vif"))
- }
- func (ev *EcVolume) FileName(ext string) string {
- switch ext {
- case ".ecx", ".ecj":
- return ev.IndexBaseFileName() + ext
- }
- // .vif
- return ev.DataBaseFileName() + ext
- }
- func (ev *EcVolume) DataBaseFileName() string {
- return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
- }
- func (ev *EcVolume) IndexBaseFileName() string {
- return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
- }
- func (ev *EcVolume) ShardSize() uint64 {
- if len(ev.Shards) > 0 {
- return uint64(ev.Shards[0].Size())
- }
- return 0
- }
- func (ev *EcVolume) Size() (size int64) {
- for _, shard := range ev.Shards {
- size += shard.Size()
- }
- return
- }
- func (ev *EcVolume) CreatedAt() time.Time {
- return ev.ecxCreatedAt
- }
- func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
- for _, s := range ev.Shards {
- shardIds = append(shardIds, s.ShardId)
- }
- return
- }
- func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
- prevVolumeId := needle.VolumeId(math.MaxUint32)
- var m *master_pb.VolumeEcShardInformationMessage
- for _, s := range ev.Shards {
- if s.VolumeId != prevVolumeId {
- m = &master_pb.VolumeEcShardInformationMessage{
- Id: uint32(s.VolumeId),
- Collection: s.Collection,
- DiskType: string(ev.diskType),
- DestroyTime: ev.DestroyTime,
- }
- messages = append(messages, m)
- }
- prevVolumeId = s.VolumeId
- m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
- }
- return
- }
- func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
- // find the needle from ecx file
- offset, size, err = ev.FindNeedleFromEcx(needleId)
- if err != nil {
- return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
- }
- intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
- return
- }
- func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
- shard := ev.Shards[0]
- // Usually shard will be padded to round of ErasureCodingSmallBlockSize.
- // So in most cases, if shardSize equals to n * ErasureCodingLargeBlockSize,
- // the data would be in small blocks.
- shardSize := shard.ecdFileSize - 1
- if ev.datFileSize > 0 {
- // To get the correct LargeBlockRowsCount
- // use datFileSize to calculate the shardSize to match the EC encoding logic.
- shardSize = ev.datFileSize / DataShardsCount
- }
- // calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, shardSize, offset, types.Size(needle.GetActualSize(size, version)))
- return
- }
- func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
- return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
- }
- func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
- var key types.NeedleId
- buf := make([]byte, types.NeedleMapEntrySize)
- l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
- for l < h {
- m := (l + h) / 2
- if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
- return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
- }
- key, offset, size = idx.IdxFileEntry(buf)
- if key == needleId {
- if processNeedleFn != nil {
- err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
- }
- return
- }
- if key < needleId {
- l = m + 1
- } else {
- h = m
- }
- }
- err = NotFoundError
- return
- }
- func (ev *EcVolume) IsTimeToDestroy() bool {
- return ev.DestroyTime > 0 && time.Now().Unix() > (int64(ev.DestroyTime)+destroyDelaySeconds)
- }
|