distributed_lock_manager.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package lock_manager
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "time"
  7. )
  8. const RenewInterval = time.Second * 3
  9. const LiveLockTTL = time.Second * 7
  10. var NoLockServerError = fmt.Errorf("no lock server found")
  11. type DistributedLockManager struct {
  12. lockManager *LockManager
  13. LockRing *LockRing
  14. Host pb.ServerAddress
  15. }
  16. func NewDistributedLockManager(host pb.ServerAddress) *DistributedLockManager {
  17. return &DistributedLockManager{
  18. lockManager: NewLockManager(),
  19. LockRing: NewLockRing(time.Second * 5),
  20. Host: host,
  21. }
  22. }
  23. func (dlm *DistributedLockManager) LockWithTimeout(key string, expiredAtNs int64, token string, owner string) (lockOwner string, renewToken string, movedTo pb.ServerAddress, err error) {
  24. movedTo, err = dlm.findLockOwningFiler(key)
  25. if err != nil {
  26. return
  27. }
  28. if movedTo != dlm.Host {
  29. return
  30. }
  31. lockOwner, renewToken, err = dlm.lockManager.Lock(key, expiredAtNs, token, owner)
  32. return
  33. }
  34. func (dlm *DistributedLockManager) findLockOwningFiler(key string) (movedTo pb.ServerAddress, err error) {
  35. servers := dlm.LockRing.GetSnapshot()
  36. if servers == nil {
  37. err = NoLockServerError
  38. return
  39. }
  40. movedTo = hashKeyToServer(key, servers)
  41. return
  42. }
  43. func (dlm *DistributedLockManager) FindLockOwner(key string) (owner string, movedTo pb.ServerAddress, err error) {
  44. movedTo, err = dlm.findLockOwningFiler(key)
  45. if err != nil {
  46. return
  47. }
  48. if movedTo != dlm.Host {
  49. servers := dlm.LockRing.GetSnapshot()
  50. glog.V(0).Infof("lock %s not on current %s but on %s from %v", key, dlm.Host, movedTo, servers)
  51. return
  52. }
  53. owner, err = dlm.lockManager.GetLockOwner(key)
  54. return
  55. }
  56. func (dlm *DistributedLockManager) Unlock(key string, token string) (movedTo pb.ServerAddress, err error) {
  57. servers := dlm.LockRing.GetSnapshot()
  58. if servers == nil {
  59. err = NoLockServerError
  60. return
  61. }
  62. server := hashKeyToServer(key, servers)
  63. if server != dlm.Host {
  64. movedTo = server
  65. return
  66. }
  67. _, err = dlm.lockManager.Unlock(key, token)
  68. return
  69. }
  70. // InsertLock is used to insert a lock to a server unconditionally
  71. // It is used when a server is down and the lock is moved to another server
  72. func (dlm *DistributedLockManager) InsertLock(key string, expiredAtNs int64, token string, owner string) {
  73. dlm.lockManager.InsertLock(key, expiredAtNs, token, owner)
  74. }
  75. func (dlm *DistributedLockManager) SelectNotOwnedLocks(servers []pb.ServerAddress) (locks []*Lock) {
  76. return dlm.lockManager.SelectLocks(func(key string) bool {
  77. server := hashKeyToServer(key, servers)
  78. return server != dlm.Host
  79. })
  80. }
  81. func (dlm *DistributedLockManager) CalculateTargetServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  82. return hashKeyToServer(key, servers)
  83. }
  84. func (dlm *DistributedLockManager) IsLocal(key string) bool {
  85. servers := dlm.LockRing.GetSnapshot()
  86. if len(servers) <= 1 {
  87. return true
  88. }
  89. return hashKeyToServer(key, servers) == dlm.Host
  90. }