lock_ring.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package lock_manager
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "sort"
  6. "sync"
  7. "time"
  8. )
  9. type LockRingSnapshot struct {
  10. servers []pb.ServerAddress
  11. ts time.Time
  12. }
  13. type LockRing struct {
  14. sync.RWMutex
  15. snapshots []*LockRingSnapshot
  16. candidateServers map[pb.ServerAddress]struct{}
  17. lastUpdateTime time.Time
  18. lastCompactTime time.Time
  19. snapshotInterval time.Duration
  20. onTakeSnapshot func(snapshot []pb.ServerAddress)
  21. }
  22. func NewLockRing(snapshotInterval time.Duration) *LockRing {
  23. return &LockRing{
  24. snapshotInterval: snapshotInterval,
  25. candidateServers: make(map[pb.ServerAddress]struct{}),
  26. }
  27. }
  28. func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) {
  29. r.Lock()
  30. defer r.Unlock()
  31. r.onTakeSnapshot = onTakeSnapshot
  32. }
  33. // AddServer adds a server to the ring
  34. // if the previous snapshot passed the snapshot interval, create a new snapshot
  35. func (r *LockRing) AddServer(server pb.ServerAddress) {
  36. r.Lock()
  37. if _, found := r.candidateServers[server]; found {
  38. r.Unlock()
  39. return
  40. }
  41. r.lastUpdateTime = time.Now()
  42. r.candidateServers[server] = struct{}{}
  43. r.Unlock()
  44. r.takeSnapshotWithDelayedCompaction()
  45. }
  46. func (r *LockRing) RemoveServer(server pb.ServerAddress) {
  47. r.Lock()
  48. if _, found := r.candidateServers[server]; !found {
  49. r.Unlock()
  50. return
  51. }
  52. r.lastUpdateTime = time.Now()
  53. delete(r.candidateServers, server)
  54. r.Unlock()
  55. r.takeSnapshotWithDelayedCompaction()
  56. }
  57. func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
  58. sort.Slice(servers, func(i, j int) bool {
  59. return servers[i] < servers[j]
  60. })
  61. r.lastUpdateTime = time.Now()
  62. r.addOneSnapshot(servers)
  63. go func() {
  64. <-time.After(r.snapshotInterval)
  65. r.compactSnapshots()
  66. }()
  67. }
  68. func (r *LockRing) takeSnapshotWithDelayedCompaction() {
  69. r.doTakeSnapshot()
  70. go func() {
  71. <-time.After(r.snapshotInterval)
  72. r.compactSnapshots()
  73. }()
  74. }
  75. func (r *LockRing) doTakeSnapshot() {
  76. servers := r.getSortedServers()
  77. r.addOneSnapshot(servers)
  78. }
  79. func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
  80. r.Lock()
  81. defer r.Unlock()
  82. ts := time.Now()
  83. t := &LockRingSnapshot{
  84. servers: servers,
  85. ts: ts,
  86. }
  87. r.snapshots = append(r.snapshots, t)
  88. for i := len(r.snapshots) - 2; i >= 0; i-- {
  89. r.snapshots[i+1] = r.snapshots[i]
  90. }
  91. r.snapshots[0] = t
  92. if r.onTakeSnapshot != nil {
  93. r.onTakeSnapshot(t.servers)
  94. }
  95. }
  96. func (r *LockRing) compactSnapshots() {
  97. r.Lock()
  98. defer r.Unlock()
  99. if r.lastCompactTime.After(r.lastUpdateTime) {
  100. return
  101. }
  102. ts := time.Now()
  103. // remove old snapshots
  104. recentSnapshotIndex := 1
  105. for ; recentSnapshotIndex < len(r.snapshots); recentSnapshotIndex++ {
  106. if ts.Sub(r.snapshots[recentSnapshotIndex].ts) > r.snapshotInterval {
  107. break
  108. }
  109. }
  110. // keep the one that has been running for a while
  111. if recentSnapshotIndex+1 <= len(r.snapshots) {
  112. r.snapshots = r.snapshots[:recentSnapshotIndex+1]
  113. }
  114. r.lastCompactTime = ts
  115. }
  116. func (r *LockRing) getSortedServers() []pb.ServerAddress {
  117. sortedServers := make([]pb.ServerAddress, 0, len(r.candidateServers))
  118. for server := range r.candidateServers {
  119. sortedServers = append(sortedServers, server)
  120. }
  121. sort.Slice(sortedServers, func(i, j int) bool {
  122. return sortedServers[i] < sortedServers[j]
  123. })
  124. return sortedServers
  125. }
  126. func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
  127. r.RLock()
  128. defer r.RUnlock()
  129. if len(r.snapshots) == 0 {
  130. return
  131. }
  132. return r.snapshots[0].servers
  133. }
  134. func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  135. if len(servers) == 0 {
  136. return ""
  137. }
  138. x := util.HashStringToLong(key)
  139. if x < 0 {
  140. x = -x
  141. }
  142. x = x % int64(len(servers))
  143. return servers[x]
  144. }