lock_manager.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package lock_manager
  2. import (
  3. "fmt"
  4. "github.com/google/uuid"
  5. "github.com/puzpuzpuz/xsync/v2"
  6. "time"
  7. )
  8. var LockErrorNonEmptyTokenOnNewLock = fmt.Errorf("lock: non-empty token on a new lock")
  9. var LockErrorNonEmptyTokenOnExpiredLock = fmt.Errorf("lock: non-empty token on an expired lock")
  10. var LockErrorTokenMismatch = fmt.Errorf("lock: token mismatch")
  11. var UnlockErrorTokenMismatch = fmt.Errorf("unlock: token mismatch")
  12. // LockManager local lock manager, used by distributed lock manager
  13. type LockManager struct {
  14. locks *xsync.MapOf[string, *Lock]
  15. }
  16. type Lock struct {
  17. Token string
  18. ExpiredAtNs int64
  19. Key string // only used for moving locks
  20. Owner string
  21. }
  22. func NewLockManager() *LockManager {
  23. t := &LockManager{
  24. locks: xsync.NewMapOf[*Lock](),
  25. }
  26. go t.CleanUp()
  27. return t
  28. }
  29. func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (renewToken string, err error) {
  30. lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
  31. if oldValue != nil {
  32. if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
  33. // lock is expired, set to a new lock
  34. if token != "" {
  35. err = LockErrorNonEmptyTokenOnExpiredLock
  36. return nil, false
  37. } else {
  38. // new lock
  39. renewToken = uuid.New().String()
  40. return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false
  41. }
  42. }
  43. // not expired
  44. if oldValue.Token == token {
  45. // token matches, renew the lock
  46. renewToken = uuid.New().String()
  47. return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false
  48. } else {
  49. err = LockErrorTokenMismatch
  50. return oldValue, false
  51. }
  52. } else {
  53. if token == "" {
  54. // new lock
  55. renewToken = uuid.New().String()
  56. return &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}, false
  57. } else {
  58. err = LockErrorNonEmptyTokenOnNewLock
  59. return nil, false
  60. }
  61. }
  62. })
  63. return
  64. }
  65. func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) {
  66. lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
  67. if oldValue != nil {
  68. now := time.Now()
  69. if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
  70. // lock is expired, delete it
  71. isUnlocked = true
  72. return nil, true
  73. }
  74. if oldValue.Token == token {
  75. if oldValue.ExpiredAtNs <= now.UnixNano() {
  76. isUnlocked = true
  77. return nil, true
  78. }
  79. return oldValue, false
  80. } else {
  81. isUnlocked = false
  82. err = UnlockErrorTokenMismatch
  83. return oldValue, false
  84. }
  85. } else {
  86. isUnlocked = true
  87. return nil, true
  88. }
  89. })
  90. return
  91. }
  92. func (lm *LockManager) CleanUp() {
  93. for {
  94. time.Sleep(1 * time.Minute)
  95. now := time.Now().UnixNano()
  96. lm.locks.Range(func(key string, value *Lock) bool {
  97. if value == nil {
  98. return true
  99. }
  100. if now > value.ExpiredAtNs {
  101. lm.locks.Delete(key)
  102. return true
  103. }
  104. return true
  105. })
  106. }
  107. }
  108. // SelectLocks takes out locks by key
  109. // if keyFn return true, the lock will be taken out
  110. func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) {
  111. now := time.Now().UnixNano()
  112. lm.locks.Range(func(key string, lock *Lock) bool {
  113. if now > lock.ExpiredAtNs {
  114. lm.locks.Delete(key)
  115. return true
  116. }
  117. if selectFn(key) {
  118. lm.locks.Delete(key)
  119. lock.Key = key
  120. locks = append(locks, lock)
  121. }
  122. return true
  123. })
  124. return
  125. }
  126. // InsertLock inserts a lock unconditionally
  127. func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, owner string) {
  128. lm.locks.Store(path, &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner})
  129. }
  130. func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
  131. lm.locks.Range(func(k string, lock *Lock) bool {
  132. if k == key && lock != nil {
  133. owner = lock.Owner
  134. return false
  135. }
  136. return true
  137. })
  138. return
  139. }