123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- package lock_manager
- import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "sort"
- "sync"
- "time"
- )
- type LockRingSnapshot struct {
- servers []pb.ServerAddress
- ts time.Time
- }
- type LockRing struct {
- sync.RWMutex
- snapshots []*LockRingSnapshot
- candidateServers map[pb.ServerAddress]struct{}
- lastUpdateTime time.Time
- lastCompactTime time.Time
- snapshotInterval time.Duration
- onTakeSnapshot func(snapshot []pb.ServerAddress)
- }
- func NewLockRing(snapshotInterval time.Duration) *LockRing {
- return &LockRing{
- snapshotInterval: snapshotInterval,
- candidateServers: make(map[pb.ServerAddress]struct{}),
- }
- }
- func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) {
- r.Lock()
- defer r.Unlock()
- r.onTakeSnapshot = onTakeSnapshot
- }
- // AddServer adds a server to the ring
- // if the previous snapshot passed the snapshot interval, create a new snapshot
- func (r *LockRing) AddServer(server pb.ServerAddress) {
- glog.V(0).Infof("add server %v", server)
- r.Lock()
- if _, found := r.candidateServers[server]; found {
- glog.V(0).Infof("add server: already exists %v", server)
- r.Unlock()
- return
- }
- r.lastUpdateTime = time.Now()
- r.candidateServers[server] = struct{}{}
- r.Unlock()
- r.takeSnapshotWithDelayedCompaction()
- }
- func (r *LockRing) RemoveServer(server pb.ServerAddress) {
- glog.V(0).Infof("remove server %v", server)
- r.Lock()
- if _, found := r.candidateServers[server]; !found {
- r.Unlock()
- return
- }
- r.lastUpdateTime = time.Now()
- delete(r.candidateServers, server)
- r.Unlock()
- r.takeSnapshotWithDelayedCompaction()
- }
- func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
- sort.Slice(servers, func(i, j int) bool {
- return servers[i] < servers[j]
- })
- r.Lock()
- r.lastUpdateTime = time.Now()
- // init candidateServers
- for _, server := range servers {
- r.candidateServers[server] = struct{}{}
- }
- r.Unlock()
- r.addOneSnapshot(servers)
- go func() {
- <-time.After(r.snapshotInterval)
- r.compactSnapshots()
- }()
- }
- func (r *LockRing) takeSnapshotWithDelayedCompaction() {
- r.doTakeSnapshot()
- go func() {
- <-time.After(r.snapshotInterval)
- r.compactSnapshots()
- }()
- }
- func (r *LockRing) doTakeSnapshot() {
- servers := r.getSortedServers()
- r.addOneSnapshot(servers)
- }
- func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
- r.Lock()
- defer r.Unlock()
- ts := time.Now()
- t := &LockRingSnapshot{
- servers: servers,
- ts: ts,
- }
- r.snapshots = append(r.snapshots, t)
- for i := len(r.snapshots) - 2; i >= 0; i-- {
- r.snapshots[i+1] = r.snapshots[i]
- }
- r.snapshots[0] = t
- if r.onTakeSnapshot != nil {
- r.onTakeSnapshot(t.servers)
- }
- }
- func (r *LockRing) compactSnapshots() {
- r.Lock()
- defer r.Unlock()
- if r.lastCompactTime.After(r.lastUpdateTime) {
- return
- }
- ts := time.Now()
- // remove old snapshots
- recentSnapshotIndex := 1
- for ; recentSnapshotIndex < len(r.snapshots); recentSnapshotIndex++ {
- if ts.Sub(r.snapshots[recentSnapshotIndex].ts) > r.snapshotInterval {
- break
- }
- }
- // keep the one that has been running for a while
- if recentSnapshotIndex+1 <= len(r.snapshots) {
- r.snapshots = r.snapshots[:recentSnapshotIndex+1]
- }
- r.lastCompactTime = ts
- }
- func (r *LockRing) getSortedServers() []pb.ServerAddress {
- sortedServers := make([]pb.ServerAddress, 0, len(r.candidateServers))
- for server := range r.candidateServers {
- sortedServers = append(sortedServers, server)
- }
- sort.Slice(sortedServers, func(i, j int) bool {
- return sortedServers[i] < sortedServers[j]
- })
- return sortedServers
- }
- func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
- r.RLock()
- defer r.RUnlock()
- if len(r.snapshots) == 0 {
- return
- }
- return r.snapshots[0].servers
- }
- func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
- if len(servers) == 0 {
- return ""
- }
- x := util.HashStringToLong(key)
- if x < 0 {
- x = -x
- }
- x = x % int64(len(servers))
- return servers[x]
- }
|