123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package lock_manager
- import (
- "fmt"
- "github.com/google/uuid"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "sync"
- "time"
- )
- var LockErrorNonEmptyTokenOnNewLock = fmt.Errorf("lock: non-empty token on a new lock")
- var LockErrorNonEmptyTokenOnExpiredLock = fmt.Errorf("lock: non-empty token on an expired lock")
- var LockErrorTokenMismatch = fmt.Errorf("lock: token mismatch")
- var UnlockErrorTokenMismatch = fmt.Errorf("unlock: token mismatch")
- var LockNotFound = fmt.Errorf("lock not found")
- // LockManager local lock manager, used by distributed lock manager
- type LockManager struct {
- locks map[string]*Lock
- accessLock sync.RWMutex
- }
- type Lock struct {
- Token string
- ExpiredAtNs int64
- Key string // only used for moving locks
- Owner string
- }
- func NewLockManager() *LockManager {
- t := &LockManager{
- locks: make(map[string]*Lock),
- }
- go t.CleanUp()
- return t
- }
- func (lm *LockManager) Lock(path string, expiredAtNs int64, token string, owner string) (lockOwner, renewToken string, err error) {
- lm.accessLock.Lock()
- defer lm.accessLock.Unlock()
- glog.V(4).Infof("lock %s %v %v %v", path, time.Unix(0, expiredAtNs), token, owner)
- if oldValue, found := lm.locks[path]; found {
- if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < time.Now().UnixNano() {
- // lock is expired, set to a new lock
- if token != "" {
- glog.V(4).Infof("lock expired key %s non-empty token %v owner %v ts %s", path, token, owner, time.Unix(0, oldValue.ExpiredAtNs))
- err = LockErrorNonEmptyTokenOnExpiredLock
- return
- } else {
- // new lock
- renewToken = uuid.New().String()
- glog.V(4).Infof("key %s new token %v owner %v", path, renewToken, owner)
- lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
- return
- }
- }
- // not expired
- lockOwner = oldValue.Owner
- if oldValue.Token == token {
- // token matches, renew the lock
- renewToken = uuid.New().String()
- glog.V(4).Infof("key %s old token %v owner %v => %v owner %v", path, oldValue.Token, oldValue.Owner, renewToken, owner)
- lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
- return
- } else {
- if token == "" {
- // new lock
- glog.V(4).Infof("key %s locked by %v", path, oldValue.Owner)
- err = fmt.Errorf("lock already owned by %v", oldValue.Owner)
- return
- }
- glog.V(4).Infof("key %s expected token %v owner %v received %v from %v", path, oldValue.Token, oldValue.Owner, token, owner)
- err = fmt.Errorf("lock: token mismatch")
- return
- }
- } else {
- glog.V(4).Infof("key %s no lock owner %v", path, owner)
- if token == "" {
- // new lock
- glog.V(4).Infof("key %s new token %v owner %v", path, token, owner)
- renewToken = uuid.New().String()
- lm.locks[path] = &Lock{Token: renewToken, ExpiredAtNs: expiredAtNs, Owner: owner}
- return
- } else {
- glog.V(4).Infof("key %s non-empty token %v owner %v", path, token, owner)
- err = LockErrorNonEmptyTokenOnNewLock
- return
- }
- }
- }
- func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) {
- lm.accessLock.Lock()
- defer lm.accessLock.Unlock()
- if oldValue, found := lm.locks[path]; found {
- now := time.Now()
- if oldValue.ExpiredAtNs > 0 && oldValue.ExpiredAtNs < now.UnixNano() {
- // lock is expired, delete it
- isUnlocked = true
- glog.V(4).Infof("key %s expired at %v", path, time.Unix(0, oldValue.ExpiredAtNs))
- delete(lm.locks, path)
- return
- }
- if oldValue.Token == token {
- isUnlocked = true
- glog.V(4).Infof("key %s unlocked with %v", path, token)
- delete(lm.locks, path)
- return
- } else {
- isUnlocked = false
- err = UnlockErrorTokenMismatch
- return
- }
- }
- err = LockNotFound
- return
- }
- func (lm *LockManager) CleanUp() {
- for {
- time.Sleep(1 * time.Minute)
- now := time.Now().UnixNano()
- lm.accessLock.Lock()
- for key, value := range lm.locks {
- if value == nil {
- continue
- }
- if now > value.ExpiredAtNs {
- glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, value.ExpiredAtNs))
- delete(lm.locks, key)
- }
- }
- lm.accessLock.Unlock()
- }
- }
- // SelectLocks takes out locks by key
- // if keyFn return true, the lock will be taken out
- func (lm *LockManager) SelectLocks(selectFn func(key string) bool) (locks []*Lock) {
- lm.accessLock.RLock()
- defer lm.accessLock.RUnlock()
- now := time.Now().UnixNano()
- for key, lock := range lm.locks {
- if now > lock.ExpiredAtNs {
- glog.V(4).Infof("key %s expired at %v", key, time.Unix(0, lock.ExpiredAtNs))
- delete(lm.locks, key)
- continue
- }
- if selectFn(key) {
- glog.V(4).Infof("key %s selected and deleted", key)
- delete(lm.locks, key)
- lock.Key = key
- locks = append(locks, lock)
- }
- }
- return
- }
- // InsertLock inserts a lock unconditionally
- func (lm *LockManager) InsertLock(path string, expiredAtNs int64, token string, owner string) {
- lm.accessLock.Lock()
- defer lm.accessLock.Unlock()
- lm.locks[path] = &Lock{Token: token, ExpiredAtNs: expiredAtNs, Owner: owner}
- }
- func (lm *LockManager) GetLockOwner(key string) (owner string, err error) {
- lm.accessLock.RLock()
- defer lm.accessLock.RUnlock()
- if lock, found := lm.locks[key]; found {
- return lock.Owner, nil
- }
- err = LockNotFound
- return
- }
|