filerstore_hardlink.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package filer
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. )
  9. func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
  10. if entry.IsDirectory() {
  11. return nil
  12. }
  13. if len(entry.HardLinkId) > 0 {
  14. // handle hard links
  15. if err := fsw.setHardLink(ctx, entry); err != nil {
  16. return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
  17. }
  18. }
  19. // check what is existing entry
  20. // glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
  21. actualStore := fsw.getActualStore(entry.FullPath)
  22. existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
  23. if err != nil && err != filer_pb.ErrNotFound {
  24. return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
  25. }
  26. // remove old hard link
  27. if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
  28. glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
  29. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  30. return err
  31. }
  32. }
  33. return nil
  34. }
  35. func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error {
  36. if len(entry.HardLinkId) == 0 {
  37. return nil
  38. }
  39. key := entry.HardLinkId
  40. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  41. if encodeErr != nil {
  42. return encodeErr
  43. }
  44. glog.V(4).Infof("setHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
  45. return fsw.KvPut(ctx, key, newBlob)
  46. }
  47. func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
  48. if len(entry.HardLinkId) == 0 {
  49. return nil
  50. }
  51. key := entry.HardLinkId
  52. value, err := fsw.KvGet(ctx, key)
  53. if err != nil {
  54. glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  55. return err
  56. }
  57. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  58. glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  59. return err
  60. }
  61. glog.V(4).Infof("maybeReadHardLink %v nlink:%d", entry.FullPath, entry.HardLinkCounter)
  62. return nil
  63. }
  64. func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  65. key := hardLinkId
  66. value, err := fsw.KvGet(ctx, key)
  67. if err == ErrKvNotFound {
  68. return nil
  69. }
  70. if err != nil {
  71. return err
  72. }
  73. entry := &Entry{}
  74. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  75. return err
  76. }
  77. entry.HardLinkCounter--
  78. if entry.HardLinkCounter <= 0 {
  79. glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
  80. return fsw.KvDelete(ctx, key)
  81. }
  82. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  83. if encodeErr != nil {
  84. return encodeErr
  85. }
  86. glog.V(4).Infof("DeleteHardLink KvPut %v", key)
  87. return fsw.KvPut(ctx, key, newBlob)
  88. }