distributed_lock_manager.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package lock_manager
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "time"
  6. )
  7. const MaxDuration = 1<<63 - 1
  8. var NoLockServerError = fmt.Errorf("no lock server found")
  9. type DistributedLockManager struct {
  10. lockManager *LockManager
  11. LockRing *LockRing
  12. Host pb.ServerAddress
  13. }
  14. func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager {
  15. return &DistributedLockManager{
  16. lockManager: NewLockManager(),
  17. LockRing: NewLockRing(time.Second * 5),
  18. Host: host,
  19. }
  20. }
  21. func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string, owner string) (renewToken string, movedTo pb.ServerAddress, err error) {
  22. movedTo, err = dlm.findLockOwningFiler(key)
  23. if err != nil {
  24. return
  25. }
  26. if movedTo != dlm.Host {
  27. return
  28. }
  29. renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token, owner)
  30. return
  31. }
  32. func (dlm *DistributedLockManager) findLockOwningFiler(key string) (movedTo pb.ServerAddress, err error) {
  33. servers := dlm.LockRing.GetSnapshot()
  34. if servers == nil {
  35. err = NoLockServerError
  36. return
  37. }
  38. movedTo = hashKeyToServer(key, servers)
  39. return
  40. }
  41. func (dlm *DistributedLockManager) FindLockOwner(key string) (owner string, movedTo pb.ServerAddress, err error) {
  42. movedTo, err = dlm.findLockOwningFiler(key)
  43. if err != nil {
  44. return
  45. }
  46. if movedTo != dlm.Host {
  47. return
  48. }
  49. owner, err = dlm.lockManager.GetLockOwner(key)
  50. return
  51. }
  52. func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb.ServerAddress, err error) {
  53. servers := dlm.LockRing.GetSnapshot()
  54. if servers == nil {
  55. err = NoLockServerError
  56. return
  57. }
  58. server := hashKeyToServer(key, servers)
  59. if server != dlm.Host {
  60. movedTo = server
  61. return
  62. }
  63. _, err = dlm.lockManager.Unlock(key, token)
  64. return
  65. }
  66. // InsertLock is used to insert a lock to a server unconditionally
  67. // It is used when a server is down and the lock is moved to another server
  68. func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string, owner string) {
  69. dlm.lockManager.InsertLock(key, expiredAtNs, token, owner)
  70. }
  71. func (dlm *DistributedLockManager) SelectNotOwnedLocks(servers []pb.ServerAddress) (locks []*Lock) {
  72. return dlm.lockManager.SelectLocks(func(key string) bool {
  73. server := hashKeyToServer(key, servers)
  74. return server != dlm.Host
  75. })
  76. }
  77. func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  78. return hashKeyToServer(key, servers)
  79. }
  80. func (dlm *DistributedLockManager) IsLocal(key string) bool {
  81. servers := dlm.LockRing.GetSnapshot()
  82. if len(servers) <= 1 {
  83. return true
  84. }
  85. return hashKeyToServer(key, servers) == dlm.Host
  86. }