lock_table.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package util
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. // LockTable is a table of locks that can be acquired.
  9. // Locks are acquired in order of request.
  10. type LockTable[T comparable] struct {
  11. mu sync.Mutex
  12. locks map[T]*LockEntry
  13. lockIdSeq int64
  14. }
  15. type LockEntry struct {
  16. mu sync.Mutex
  17. waiters []*ActiveLock // ordered waiters that are blocked by exclusive locks
  18. activeLockOwnerCount int32
  19. lockType LockType
  20. cond *sync.Cond
  21. }
  22. type LockType int
  23. const (
  24. SharedLock LockType = iota
  25. ExclusiveLock
  26. )
  27. type ActiveLock struct {
  28. ID int64
  29. isDeleted bool
  30. intention string // for debugging
  31. }
  32. func NewLockTable[T comparable]() *LockTable[T] {
  33. return &LockTable[T]{
  34. locks: make(map[T]*LockEntry),
  35. }
  36. }
  37. func (lt *LockTable[T]) NewActiveLock(intention string) *ActiveLock {
  38. id := atomic.AddInt64(&lt.lockIdSeq, 1)
  39. l := &ActiveLock{ID: id, intention: intention}
  40. return l
  41. }
  42. func (lt *LockTable[T]) AcquireLock(intention string, key T, lockType LockType) (lock *ActiveLock) {
  43. lt.mu.Lock()
  44. // Get or create the lock entry for the key
  45. entry, exists := lt.locks[key]
  46. if !exists {
  47. entry = &LockEntry{}
  48. entry.cond = sync.NewCond(&entry.mu)
  49. lt.locks[key] = entry
  50. }
  51. lt.mu.Unlock()
  52. lock = lt.NewActiveLock(intention)
  53. // If the lock is held exclusively, wait
  54. entry.mu.Lock()
  55. if len(entry.waiters) > 0 || lockType == ExclusiveLock {
  56. if glog.V(4) {
  57. fmt.Printf("ActiveLock %d %s wait for %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
  58. if len(entry.waiters) > 0 {
  59. for _, waiter := range entry.waiters {
  60. fmt.Printf(" %d", waiter.ID)
  61. }
  62. fmt.Printf("\n")
  63. }
  64. }
  65. entry.waiters = append(entry.waiters, lock)
  66. if lockType == ExclusiveLock {
  67. for !lock.isDeleted && ((len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) || entry.activeLockOwnerCount > 0) {
  68. entry.cond.Wait()
  69. }
  70. } else {
  71. for !lock.isDeleted && (len(entry.waiters) > 0 && lock.ID != entry.waiters[0].ID) {
  72. entry.cond.Wait()
  73. }
  74. }
  75. // Remove the transaction from the waiters list
  76. if len(entry.waiters) > 0 && lock.ID == entry.waiters[0].ID {
  77. entry.waiters = entry.waiters[1:]
  78. entry.cond.Broadcast()
  79. }
  80. }
  81. entry.activeLockOwnerCount++
  82. // Otherwise, grant the lock
  83. entry.lockType = lockType
  84. if glog.V(4) {
  85. fmt.Printf("ActiveLock %d %s locked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, lockType, len(entry.waiters), entry.activeLockOwnerCount)
  86. if len(entry.waiters) > 0 {
  87. for _, waiter := range entry.waiters {
  88. fmt.Printf(" %d", waiter.ID)
  89. }
  90. fmt.Printf("\n")
  91. }
  92. }
  93. entry.mu.Unlock()
  94. return lock
  95. }
  96. func (lt *LockTable[T]) ReleaseLock(key T, lock *ActiveLock) {
  97. lt.mu.Lock()
  98. defer lt.mu.Unlock()
  99. entry, exists := lt.locks[key]
  100. if !exists {
  101. return
  102. }
  103. entry.mu.Lock()
  104. defer entry.mu.Unlock()
  105. // Remove the transaction from the waiters list
  106. for i, waiter := range entry.waiters {
  107. if waiter == lock {
  108. waiter.isDeleted = true
  109. entry.waiters = append(entry.waiters[:i], entry.waiters[i+1:]...)
  110. break
  111. }
  112. }
  113. // If there are no waiters, release the lock
  114. if len(entry.waiters) == 0 {
  115. delete(lt.locks, key)
  116. }
  117. if glog.V(4) {
  118. fmt.Printf("ActiveLock %d %s unlocked %+v type=%v with waiters %d active %d.\n", lock.ID, lock.intention, key, entry.lockType, len(entry.waiters), entry.activeLockOwnerCount)
  119. if len(entry.waiters) > 0 {
  120. for _, waiter := range entry.waiters {
  121. fmt.Printf(" %d", waiter.ID)
  122. }
  123. fmt.Printf("\n")
  124. }
  125. }
  126. entry.activeLockOwnerCount--
  127. // Notify the next waiter
  128. entry.cond.Broadcast()
  129. }
  130. func main() {
  131. }