123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- package storage
- import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/util/log"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
- )
- func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
- count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
- if err != nil {
- return 0, err
- }
- if cookie != n.Cookie {
- return 0, fmt.Errorf("unexpected cookie %x", cookie)
- }
- if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
- return 0, err
- }
- return int64(count), nil
- }
- func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
- _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
- if len(intervals) == 0 {
- return erasure_coding.NotFoundError
- }
- shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
- hasDeletionSuccess := false
- err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
- if err == nil {
- hasDeletionSuccess = true
- }
- for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
- if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
- hasDeletionSuccess = true
- }
- }
- if hasDeletionSuccess {
- return nil
- }
- return err
- }
- func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
- ecVolume.ShardLocationsLock.RLock()
- sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
- ecVolume.ShardLocationsLock.RUnlock()
- if !hasShardLocations {
- return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
- }
- for _, sourceDataNode := range sourceDataNodes {
- log.Tracef("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
- err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
- if err != nil {
- return err
- }
- log.Debugf("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err)
- }
- return nil
- }
- func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
- return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- // copy data slice
- _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
- VolumeId: uint32(vid),
- Collection: collection,
- FileKey: uint64(needleId),
- Version: uint32(version),
- })
- if err != nil {
- return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err)
- }
- return nil
- })
- }
|