filerstore_hardlink.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package filer
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. )
  9. func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
  10. if len(entry.HardLinkId) == 0 {
  11. return nil
  12. }
  13. // handle hard links
  14. if err := fsw.setHardLink(ctx, entry); err != nil {
  15. return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
  16. }
  17. // check what is existing entry
  18. glog.V(4).Infof("handleUpdateToHardLinks FindEntry %s", entry.FullPath)
  19. actualStore := fsw.getActualStore(entry.FullPath)
  20. existingEntry, err := actualStore.FindEntry(ctx, entry.FullPath)
  21. if err != nil && err != filer_pb.ErrNotFound {
  22. return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
  23. }
  24. // remove old hard link
  25. if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
  26. glog.V(4).Infof("handleUpdateToHardLinks DeleteHardLink %s", entry.FullPath)
  27. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  28. return err
  29. }
  30. }
  31. return nil
  32. }
  33. func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error {
  34. if len(entry.HardLinkId) == 0 {
  35. return nil
  36. }
  37. key := entry.HardLinkId
  38. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  39. if encodeErr != nil {
  40. return encodeErr
  41. }
  42. return fsw.KvPut(ctx, key, newBlob)
  43. }
  44. func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
  45. if len(entry.HardLinkId) == 0 {
  46. return nil
  47. }
  48. key := entry.HardLinkId
  49. glog.V(4).Infof("maybeReadHardLink KvGet %v", key)
  50. value, err := fsw.KvGet(ctx, key)
  51. if err != nil {
  52. glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  53. return err
  54. }
  55. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  56. glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  57. return err
  58. }
  59. return nil
  60. }
  61. func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  62. key := hardLinkId
  63. value, err := fsw.KvGet(ctx, key)
  64. if err == ErrKvNotFound {
  65. return nil
  66. }
  67. if err != nil {
  68. return err
  69. }
  70. entry := &Entry{}
  71. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  72. return err
  73. }
  74. entry.HardLinkCounter--
  75. if entry.HardLinkCounter <= 0 {
  76. glog.V(4).Infof("DeleteHardLink KvDelete %v", key)
  77. return fsw.KvDelete(ctx, key)
  78. }
  79. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  80. if encodeErr != nil {
  81. return encodeErr
  82. }
  83. glog.V(4).Infof("DeleteHardLink KvPut %v", key)
  84. return fsw.KvPut(ctx, key, newBlob)
  85. }