filerstore_hardlink.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package filer
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/util/log"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. )
  9. func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
  10. if len(entry.HardLinkId) == 0 {
  11. return nil
  12. }
  13. // handle hard links
  14. if err := fsw.setHardLink(ctx, entry); err != nil {
  15. return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
  16. }
  17. // check what is existing entry
  18. existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
  19. if err != nil && err != filer_pb.ErrNotFound {
  20. return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
  21. }
  22. // remove old hard link
  23. if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 {
  24. if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
  25. return err
  26. }
  27. }
  28. return nil
  29. }
  30. func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error {
  31. if len(entry.HardLinkId) == 0 {
  32. return nil
  33. }
  34. key := entry.HardLinkId
  35. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  36. if encodeErr != nil {
  37. return encodeErr
  38. }
  39. return fsw.KvPut(ctx, key, newBlob)
  40. }
  41. func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
  42. if len(entry.HardLinkId) == 0 {
  43. return nil
  44. }
  45. key := entry.HardLinkId
  46. value, err := fsw.KvGet(ctx, key)
  47. if err != nil {
  48. log.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  49. return err
  50. }
  51. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  52. log.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
  53. return err
  54. }
  55. return nil
  56. }
  57. func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
  58. key := hardLinkId
  59. value, err := fsw.KvGet(ctx, key)
  60. if err == ErrKvNotFound {
  61. return nil
  62. }
  63. if err != nil {
  64. return err
  65. }
  66. entry := &Entry{}
  67. if err = entry.DecodeAttributesAndChunks(value); err != nil {
  68. return err
  69. }
  70. entry.HardLinkCounter--
  71. if entry.HardLinkCounter <= 0 {
  72. return fsw.KvDelete(ctx, key)
  73. }
  74. newBlob, encodeErr := entry.EncodeAttributesAndChunks()
  75. if encodeErr != nil {
  76. return encodeErr
  77. }
  78. return fsw.KvPut(ctx, key, newBlob)
  79. }