123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package filer
- import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/util/log"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- )
- type HardLinkId []byte
- func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) {
- if p == "/" {
- return nil
- }
- entry, findErr := f.FindEntry(ctx, p)
- if findErr != nil {
- return findErr
- }
- isCollection := f.isBucket(entry)
- var chunks []*filer_pb.FileChunk
- var hardLinkIds []HardLinkId
- chunks = append(chunks, entry.Chunks...)
- if entry.IsDirectory() {
- // delete the folder children, not including the folder itself
- var dirChunks []*filer_pb.FileChunk
- var dirHardLinkIds []HardLinkId
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures)
- if err != nil {
- log.Infof("delete directory %s: %v", p, err)
- return fmt.Errorf("delete directory %s: %v", p, err)
- }
- chunks = append(chunks, dirChunks...)
- hardLinkIds = append(hardLinkIds, dirHardLinkIds...)
- }
- // delete the file or folder
- err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures)
- if err != nil {
- return fmt.Errorf("delete file %s: %v", p, err)
- }
- if shouldDeleteChunks && !isCollection {
- go f.DeleteChunks(chunks)
- }
- // A case not handled:
- // what if the chunk is in a different collection?
- if shouldDeleteChunks {
- f.maybeDeleteHardLinks(hardLinkIds)
- }
- if isCollection {
- collectionName := entry.Name()
- f.doDeleteCollection(collectionName)
- f.deleteBucket(collectionName)
- }
- return nil
- }
- func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) {
- lastFileName := ""
- includeLastFile := false
- for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "")
- if err != nil {
- log.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
- }
- if lastFileName == "" && !isRecursive && len(entries) > 0 {
- // only for first iteration in the loop
- log.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name())
- return nil, nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
- }
- for _, sub := range entries {
- lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- var dirHardLinkIds []HardLinkId
- if sub.IsDirectory() {
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil)
- chunks = append(chunks, dirChunks...)
- hardlinkIds = append(hardlinkIds, dirHardLinkIds...)
- } else {
- f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil)
- if len(sub.HardLinkId) != 0 {
- // hard link chunk data are deleted separately
- hardlinkIds = append(hardlinkIds, sub.HardLinkId)
- } else {
- chunks = append(chunks, sub.Chunks...)
- }
- }
- if err != nil && !ignoreRecursiveError {
- return nil, nil, err
- }
- }
- if len(entries) < PaginationSize {
- break
- }
- }
- log.Tracef("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
- if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
- return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
- return chunks, hardlinkIds, nil
- }
- func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) {
- log.Tracef("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
- if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
- return fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- if !entry.IsDirectory() {
- f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures)
- }
- return nil
- }
- func (f *Filer) doDeleteCollection(collectionName string) (err error) {
- return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
- Name: collectionName,
- })
- if err != nil {
- log.Infof("delete collection %s: %v", collectionName, err)
- }
- return err
- })
- }
|