lock_manager.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package lock_manager
  2. import (
  3. "fmt"
  4. "github.com/google/uuid"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "sync"
  7. "time"
  8. )
  9. var LockErrorNonEmptyTokenOnNewLock = fmt.Errorf("lock: non-empty token on a new lock")
  10. var LockErrorNonEmptyTokenOnExpiredLock = fmt.Errorf("lock: non-empty token on an expired lock")
  11. var LockErrorTokenMismatch = fmt.Errorf("lock: token mismatch")
  12. var UnlockErrorTokenMismatch = fmt.Errorf("unlock: token mismatch")
  13. var LockNotFound = fmt.Errorf("lock not found")
  14. // LockManager local lock manager, used by distributed lock manager
  15. type LockManager struct {
  16. locks map[string]*Lock
  17. accessLock sync.RWMutex
  18. }
  19. type Lock struct {
  20. Token string
  21. ExpiredAtNs int64
  22. Key string // only used for moving locks
  23. Owner string
  24. }
  25. func NewLockManager() *LockManager {
  26. t := &LockManager{
  27. locks: make(map[string]*Lock),
  28. }
  29. go t.CleanUp()
  30. return t
  31. }
  32. func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (lockOwner, renewToken string, err error) {
  33. lm.accessLock.Lock()
  34. defer lm.accessLock.Unlock()
  35. glog.V(4).Infof("lock %s %v %v %v", path, time.Unix(0, expiredAtNs), token, owner)
  36. if oldValue, found := lm.locks[path]; found {
  37. if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
  38. // lock is expired, set to a new lock
  39. if token != "" {
  40. glog.V(4).Infof("lock expired key %s non-empty token %v owner %v ts %s", path, token, owner, time.Unix(0, oldValue.ExpiredAtNs))
  41. err = LockErrorNonEmptyTokenOnExpiredLock
  42. return
  43. } else {
  44. // new lock
  45. renewToken = uuid.New().String()
  46. glog.V(4).Infof("key %s new token %v owner %v", path, renewToken, owner)
  47. lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
  48. return
  49. }
  50. }
  51. // not expired
  52. lockOwner = oldValue.Owner
  53. if oldValue.Token == token {
  54. // token matches, renew the lock
  55. renewToken = uuid.New().String()
  56. glog.V(4).Infof("key %s old token %v owner %v => %v owner %v", path, oldValue.Token, oldValue.Owner, renewToken, owner)
  57. lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
  58. return
  59. } else {
  60. if token == "" {
  61. // new lock
  62. glog.V(4).Infof("key %s locked by %v", path, oldValue.Owner)
  63. err = fmt.Errorf("lock already owned by %v", oldValue.Owner)
  64. return
  65. }
  66. glog.V(4).Infof("key %s expected token %v owner %v received %v from %v", path, oldValue.Token, oldValue.Owner, token, owner)
  67. err = fmt.Errorf("lock: token mismatch")
  68. return
  69. }
  70. } else {
  71. glog.V(4).Infof("key %s no lock owner %v", path, owner)
  72. if token == "" {
  73. // new lock
  74. glog.V(4).Infof("key %s new token %v owner %v", path, token, owner)
  75. renewToken = uuid.New().String()
  76. lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
  77. return
  78. } else {
  79. glog.V(4).Infof("key %s non-empty token %v owner %v", path, token, owner)
  80. err = LockErrorNonEmptyTokenOnNewLock
  81. return
  82. }
  83. }
  84. }
  85. func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) {
  86. lm.accessLock.Lock()
  87. defer lm.accessLock.Unlock()
  88. if oldValue, found := lm.locks[path]; found {
  89. now := time.Now()
  90. if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
  91. // lock is expired, delete it
  92. isUnlocked = true
  93. glog.V(4).Infof("key %s expired at %v", path, time.Unix(0, oldValue.ExpiredAtNs))
  94. delete(lm.locks, path)
  95. return
  96. }
  97. if oldValue.Token == token {
  98. isUnlocked = true
  99. glog.V(4).Infof("key %s unlocked with %v", path, token)
  100. delete(lm.locks, path)
  101. return
  102. } else {
  103. isUnlocked = false
  104. err = UnlockErrorTokenMismatch
  105. return
  106. }
  107. }
  108. err = LockNotFound
  109. return
  110. }
  111. func (lm *LockManager) CleanUp() {
  112. for {
  113. time.Sleep(1 * time.Minute)
  114. now := time.Now().UnixNano()
  115. lm.accessLock.Lock()
  116. for key, value := range lm.locks {
  117. if value == nil {
  118. continue
  119. }
  120. if now > value.ExpiredAtNs {
  121. glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, value.ExpiredAtNs))
  122. delete(lm.locks, key)
  123. }
  124. }
  125. lm.accessLock.Unlock()
  126. }
  127. }
  128. // SelectLocks takes out locks by key
  129. // if keyFn return true, the lock will be taken out
  130. func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) {
  131. lm.accessLock.RLock()
  132. defer lm.accessLock.RUnlock()
  133. now := time.Now().UnixNano()
  134. for key, lock := range lm.locks {
  135. if now > lock.ExpiredAtNs {
  136. glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, lock.ExpiredAtNs))
  137. delete(lm.locks, key)
  138. continue
  139. }
  140. if selectFn(key) {
  141. glog.V(4).Infof("key %s selected and deleted", key)
  142. delete(lm.locks, key)
  143. lock.Key = key
  144. locks = append(locks, lock)
  145. }
  146. }
  147. return
  148. }
  149. // InsertLock inserts a lock unconditionally
  150. func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, owner string) {
  151. lm.accessLock.Lock()
  152. defer lm.accessLock.Unlock()
  153. lm.locks[path] = &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner}
  154. }
  155. func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
  156. lm.accessLock.RLock()
  157. defer lm.accessLock.RUnlock()
  158. if lock, found := lm.locks[key]; found {
  159. return lock.Owner, nil
  160. }
  161. err = LockNotFound
  162. return
  163. }