dir_rename.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package filesys
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "github.com/seaweedfs/fuse"
  9. "github.com/seaweedfs/fuse/fs"
  10. "io"
  11. "os"
  12. "strings"
  13. )
  14. func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirectory fs.Node) error {
  15. if err := checkName(req.NewName); err != nil {
  16. return err
  17. }
  18. if err := checkName(req.OldName); err != nil {
  19. return err
  20. }
  21. newDir := newDirectory.(*Dir)
  22. newPath := util.NewFullPath(newDir.FullPath(), req.NewName)
  23. oldPath := util.NewFullPath(dir.FullPath(), req.OldName)
  24. glog.V(4).Infof("dir Rename %s => %s", oldPath, newPath)
  25. // update remote filer
  26. err := dir.wfs.WithFilerClient(true, func(client filer_pb.SeaweedFilerClient) error {
  27. ctx, cancel := context.WithCancel(context.Background())
  28. defer cancel()
  29. request := &filer_pb.StreamRenameEntryRequest{
  30. OldDirectory: dir.FullPath(),
  31. OldName: req.OldName,
  32. NewDirectory: newDir.FullPath(),
  33. NewName: req.NewName,
  34. Signatures: []int32{dir.wfs.signature},
  35. }
  36. stream, err := client.StreamRenameEntry(ctx, request)
  37. if err != nil {
  38. glog.Errorf("dir AtomicRenameEntry %s => %s : %v", oldPath, newPath, err)
  39. return fuse.EIO
  40. }
  41. for {
  42. resp, recvErr := stream.Recv()
  43. if recvErr != nil {
  44. if recvErr == io.EOF {
  45. break
  46. } else {
  47. glog.V(0).Infof("dir Rename %s => %s receive: %v", oldPath, newPath, recvErr)
  48. if strings.Contains(recvErr.Error(), "not empty") {
  49. return fuse.EEXIST
  50. }
  51. if strings.Contains(recvErr.Error(), "not directory") {
  52. return fuse.ENOTDIR
  53. }
  54. return recvErr
  55. }
  56. }
  57. if err = dir.handleRenameResponse(ctx, resp); err != nil {
  58. glog.V(0).Infof("dir Rename %s => %s : %v", oldPath, newPath, err)
  59. return err
  60. }
  61. }
  62. return nil
  63. })
  64. return err
  65. }
  66. func (dir *Dir) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamRenameEntryResponse) error {
  67. // comes from filer StreamRenameEntry, can only be create or delete entry
  68. if resp.EventNotification.NewEntry != nil {
  69. // with new entry, the old entry name also exists. This is the first step to create new entry
  70. newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
  71. if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
  72. return err
  73. }
  74. oldParent, newParent := util.FullPath(resp.Directory), util.FullPath(resp.EventNotification.NewParentPath)
  75. oldName, newName := resp.EventNotification.OldEntry.Name, resp.EventNotification.NewEntry.Name
  76. entryFileMode := newEntry.Attr.Mode
  77. oldPath := oldParent.Child(oldName)
  78. newPath := newParent.Child(newName)
  79. oldFsNode := NodeWithId(oldPath.AsInode(entryFileMode))
  80. newFsNode := NodeWithId(newPath.AsInode(entryFileMode))
  81. newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode(os.ModeDir)))
  82. var newDir *Dir
  83. if found {
  84. newDir = newDirNode.(*Dir)
  85. }
  86. dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
  87. if file, ok := internalNode.(*File); ok {
  88. glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
  89. file.Name = newName
  90. file.id = uint64(newFsNode)
  91. if found {
  92. file.dir = newDir
  93. }
  94. }
  95. if dir, ok := internalNode.(*Dir); ok {
  96. glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
  97. dir.name = newName
  98. dir.id = uint64(newFsNode)
  99. if found {
  100. dir.parent = newDir
  101. }
  102. }
  103. })
  104. // change file handle
  105. if !newEntry.IsDirectory() {
  106. inodeId := oldPath.AsInode(entryFileMode)
  107. dir.wfs.handlesLock.Lock()
  108. if existingHandle, found := dir.wfs.handles[inodeId]; found && existingHandle != nil {
  109. glog.V(4).Infof("opened file handle %s => %s", oldPath, newPath)
  110. delete(dir.wfs.handles, inodeId)
  111. existingHandle.handle = newPath.AsInode(entryFileMode)
  112. existingHandle.f.entry.Name = newName
  113. existingHandle.f.id = newPath.AsInode(entryFileMode)
  114. dir.wfs.handles[newPath.AsInode(entryFileMode)] = existingHandle
  115. }
  116. dir.wfs.handlesLock.Unlock()
  117. }
  118. } else if resp.EventNotification.OldEntry != nil {
  119. // without new entry, only old entry name exists. This is the second step to delete old entry
  120. if err := dir.wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
  121. return err
  122. }
  123. }
  124. return nil
  125. }