lock_ring.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package lock_manager
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "sort"
  7. "sync"
  8. "time"
  9. )
  10. type LockRingSnapshot struct {
  11. servers []pb.ServerAddress
  12. ts time.Time
  13. }
  14. type LockRing struct {
  15. sync.RWMutex
  16. snapshots []*LockRingSnapshot
  17. candidateServers map[pb.ServerAddress]struct{}
  18. lastUpdateTime time.Time
  19. lastCompactTime time.Time
  20. snapshotInterval time.Duration
  21. onTakeSnapshot func(snapshot []pb.ServerAddress)
  22. }
  23. func NewLockRing(snapshotInterval time.Duration) *LockRing {
  24. return &LockRing{
  25. snapshotInterval: snapshotInterval,
  26. candidateServers: make(map[pb.ServerAddress]struct{}),
  27. }
  28. }
  29. func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) {
  30. r.Lock()
  31. defer r.Unlock()
  32. r.onTakeSnapshot = onTakeSnapshot
  33. }
  34. // AddServer adds a server to the ring
  35. // if the previous snapshot passed the snapshot interval, create a new snapshot
  36. func (r *LockRing) AddServer(server pb.ServerAddress) {
  37. glog.V(0).Infof("add server %v", server)
  38. r.Lock()
  39. if _, found := r.candidateServers[server]; found {
  40. glog.V(0).Infof("add server: already exists %v", server)
  41. r.Unlock()
  42. return
  43. }
  44. r.lastUpdateTime = time.Now()
  45. r.candidateServers[server] = struct{}{}
  46. r.Unlock()
  47. r.takeSnapshotWithDelayedCompaction()
  48. }
  49. func (r *LockRing) RemoveServer(server pb.ServerAddress) {
  50. glog.V(0).Infof("remove server %v", server)
  51. r.Lock()
  52. if _, found := r.candidateServers[server]; !found {
  53. r.Unlock()
  54. return
  55. }
  56. r.lastUpdateTime = time.Now()
  57. delete(r.candidateServers, server)
  58. r.Unlock()
  59. r.takeSnapshotWithDelayedCompaction()
  60. }
  61. func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
  62. sort.Slice(servers, func(i, j int) bool {
  63. return servers[i] < servers[j]
  64. })
  65. r.Lock()
  66. r.lastUpdateTime = time.Now()
  67. // init candidateServers
  68. for _, server := range servers {
  69. r.candidateServers[server] = struct{}{}
  70. }
  71. r.Unlock()
  72. r.addOneSnapshot(servers)
  73. go func() {
  74. <-time.After(r.snapshotInterval)
  75. r.compactSnapshots()
  76. }()
  77. }
  78. func (r *LockRing) takeSnapshotWithDelayedCompaction() {
  79. r.doTakeSnapshot()
  80. go func() {
  81. <-time.After(r.snapshotInterval)
  82. r.compactSnapshots()
  83. }()
  84. }
  85. func (r *LockRing) doTakeSnapshot() {
  86. servers := r.getSortedServers()
  87. r.addOneSnapshot(servers)
  88. }
  89. func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
  90. r.Lock()
  91. defer r.Unlock()
  92. ts := time.Now()
  93. t := &LockRingSnapshot{
  94. servers: servers,
  95. ts: ts,
  96. }
  97. r.snapshots = append(r.snapshots, t)
  98. for i := len(r.snapshots) - 2; i >= 0; i-- {
  99. r.snapshots[i+1] = r.snapshots[i]
  100. }
  101. r.snapshots[0] = t
  102. if r.onTakeSnapshot != nil {
  103. r.onTakeSnapshot(t.servers)
  104. }
  105. }
  106. func (r *LockRing) compactSnapshots() {
  107. r.Lock()
  108. defer r.Unlock()
  109. if r.lastCompactTime.After(r.lastUpdateTime) {
  110. return
  111. }
  112. ts := time.Now()
  113. // remove old snapshots
  114. recentSnapshotIndex := 1
  115. for ; recentSnapshotIndex < len(r.snapshots); recentSnapshotIndex++ {
  116. if ts.Sub(r.snapshots[recentSnapshotIndex].ts) > r.snapshotInterval {
  117. break
  118. }
  119. }
  120. // keep the one that has been running for a while
  121. if recentSnapshotIndex+1 <= len(r.snapshots) {
  122. r.snapshots = r.snapshots[:recentSnapshotIndex+1]
  123. }
  124. r.lastCompactTime = ts
  125. }
  126. func (r *LockRing) getSortedServers() []pb.ServerAddress {
  127. sortedServers := make([]pb.ServerAddress, 0, len(r.candidateServers))
  128. for server := range r.candidateServers {
  129. sortedServers = append(sortedServers, server)
  130. }
  131. sort.Slice(sortedServers, func(i, j int) bool {
  132. return sortedServers[i] < sortedServers[j]
  133. })
  134. return sortedServers
  135. }
  136. func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
  137. r.RLock()
  138. defer r.RUnlock()
  139. if len(r.snapshots) == 0 {
  140. return
  141. }
  142. return r.snapshots[0].servers
  143. }
  144. func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
  145. if len(servers) == 0 {
  146. return ""
  147. }
  148. x := util.HashStringToLong(key)
  149. if x < 0 {
  150. x = -x
  151. }
  152. x = x % int64(len(servers))
  153. return servers[x]
  154. }