123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- package filer2
- import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- )
- func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
- if p == "/" {
- return nil
- }
- entry, findErr := f.FindEntry(ctx, p)
- if findErr != nil {
- return findErr
- }
- isCollection := f.isBucket(entry)
- var chunks []*filer_pb.FileChunk
- chunks = append(chunks, entry.Chunks...)
- if entry.IsDirectory() {
- // delete the folder children, not including the folder itself
- var dirChunks []*filer_pb.FileChunk
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection)
- if err != nil {
- glog.V(0).Infof("delete directory %s: %v", p, err)
- return fmt.Errorf("delete directory %s: %v", p, err)
- }
- chunks = append(chunks, dirChunks...)
- }
- // delete the file or folder
- err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
- if err != nil {
- return fmt.Errorf("delete file %s: %v", p, err)
- }
- if shouldDeleteChunks && !isCollection {
- go f.DeleteChunks(chunks)
- }
- if isCollection {
- collectionName := entry.Name()
- f.doDeleteCollection(collectionName)
- f.deleteBucket(collectionName)
- }
- return nil
- }
- func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
- lastFileName := ""
- includeLastFile := false
- for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
- if err != nil {
- glog.Errorf("list folder %s: %v", entry.FullPath, err)
- return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
- }
- if lastFileName == "" && !isRecursive && len(entries) > 0 {
- // only for first iteration in the loop
- return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
- }
- for _, sub := range entries {
- lastFileName = sub.Name()
- var dirChunks []*filer_pb.FileChunk
- if sub.IsDirectory() {
- dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
- chunks = append(chunks, dirChunks...)
- } else {
- chunks = append(chunks, sub.Chunks...)
- }
- if err != nil && !ignoreRecursiveError {
- return nil, err
- }
- }
- if len(entries) < PaginationSize {
- break
- }
- }
- f.cacheDelDirectory(string(entry.FullPath))
- glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
- if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
- return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
- return chunks, nil
- }
- func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
- glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
- if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
- return fmt.Errorf("filer store delete: %v", storeDeletionErr)
- }
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
- return nil
- }
- func (f *Filer) doDeleteCollection(collectionName string) (err error) {
- return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
- Name: collectionName,
- })
- if err != nil {
- glog.Infof("delete collection %s: %v", collectionName, err)
- }
- return err
- })
- }
|