lock_client.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package cluster
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster/lock_manager"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type LockClient struct {
  14. grpcDialOption grpc.DialOption
  15. maxLockDuration time.Duration
  16. sleepDuration time.Duration
  17. seedFiler pb.ServerAddress
  18. }
  19. func NewLockClient(grpcDialOption grpc.DialOption, seedFiler pb.ServerAddress) *LockClient {
  20. return &LockClient{
  21. grpcDialOption: grpcDialOption,
  22. maxLockDuration: 5 * time.Second,
  23. sleepDuration: 2473 * time.Millisecond,
  24. seedFiler: seedFiler,
  25. }
  26. }
  27. type LiveLock struct {
  28. key string
  29. renewToken string
  30. expireAtNs int64
  31. hostFiler pb.ServerAddress
  32. cancelCh chan struct{}
  33. grpcDialOption grpc.DialOption
  34. isLocked bool
  35. self string
  36. lc *LockClient
  37. owner string
  38. }
  39. // NewShortLivedLock creates a lock with a 5-second duration
  40. func (lc *LockClient) NewShortLivedLock(key string, owner string) (lock *LiveLock) {
  41. lock = &LiveLock{
  42. key: key,
  43. hostFiler: lc.seedFiler,
  44. cancelCh: make(chan struct{}),
  45. expireAtNs: time.Now().Add(5 * time.Second).UnixNano(),
  46. grpcDialOption: lc.grpcDialOption,
  47. self: owner,
  48. lc: lc,
  49. }
  50. lock.retryUntilLocked(5 * time.Second)
  51. return
  52. }
  53. // StartLongLivedLock starts a goroutine to lock the key and returns immediately.
  54. func (lc *LockClient) StartLongLivedLock(key string, owner string, onLockOwnerChange func(newLockOwner string)) (lock *LiveLock) {
  55. lock = &LiveLock{
  56. key: key,
  57. hostFiler: lc.seedFiler,
  58. cancelCh: make(chan struct{}),
  59. expireAtNs: time.Now().Add(lock_manager.LiveLockTTL).UnixNano(),
  60. grpcDialOption: lc.grpcDialOption,
  61. self: owner,
  62. lc: lc,
  63. }
  64. go func() {
  65. isLocked := false
  66. lockOwner := ""
  67. for {
  68. if isLocked {
  69. if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err != nil {
  70. glog.V(0).Infof("Lost lock %s: %v", key, err)
  71. isLocked = false
  72. }
  73. } else {
  74. if err := lock.AttemptToLock(lock_manager.LiveLockTTL); err == nil {
  75. isLocked = true
  76. }
  77. }
  78. if lockOwner != lock.LockOwner() && lock.LockOwner() != "" {
  79. glog.V(0).Infof("Lock owner changed from %s to %s", lockOwner, lock.LockOwner())
  80. onLockOwnerChange(lock.LockOwner())
  81. lockOwner = lock.LockOwner()
  82. }
  83. select {
  84. case <-lock.cancelCh:
  85. return
  86. default:
  87. time.Sleep(lock_manager.RenewInterval)
  88. }
  89. }
  90. }()
  91. return
  92. }
  93. func (lock *LiveLock) retryUntilLocked(lockDuration time.Duration) {
  94. util.RetryUntil("create lock:"+lock.key, func() error {
  95. return lock.AttemptToLock(lockDuration)
  96. }, func(err error) (shouldContinue bool) {
  97. if err != nil {
  98. glog.Warningf("create lock %s: %s", lock.key, err)
  99. }
  100. return lock.renewToken == ""
  101. })
  102. }
  103. func (lock *LiveLock) AttemptToLock(lockDuration time.Duration) error {
  104. errorMessage, err := lock.doLock(lockDuration)
  105. if err != nil {
  106. time.Sleep(time.Second)
  107. return err
  108. }
  109. if errorMessage != "" {
  110. time.Sleep(time.Second)
  111. return fmt.Errorf("%v", errorMessage)
  112. }
  113. lock.isLocked = true
  114. return nil
  115. }
  116. func (lock *LiveLock) StopShortLivedLock() error {
  117. if !lock.isLocked {
  118. return nil
  119. }
  120. defer func() {
  121. lock.isLocked = false
  122. }()
  123. return pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  124. _, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  125. Name: lock.key,
  126. RenewToken: lock.renewToken,
  127. })
  128. return err
  129. })
  130. }
  131. func (lock *LiveLock) doLock(lockDuration time.Duration) (errorMessage string, err error) {
  132. err = pb.WithFilerClient(false, 0, lock.hostFiler, lock.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  133. resp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  134. Name: lock.key,
  135. SecondsToLock: int64(lockDuration.Seconds()),
  136. RenewToken: lock.renewToken,
  137. IsMoved: false,
  138. Owner: lock.self,
  139. })
  140. if err == nil && resp != nil {
  141. lock.renewToken = resp.RenewToken
  142. } else {
  143. //this can be retried. Need to remember the last valid renewToken
  144. lock.renewToken = ""
  145. }
  146. if resp != nil {
  147. errorMessage = resp.Error
  148. if resp.LockHostMovedTo != "" {
  149. lock.hostFiler = pb.ServerAddress(resp.LockHostMovedTo)
  150. lock.lc.seedFiler = lock.hostFiler
  151. }
  152. if resp.LockOwner != "" {
  153. lock.owner = resp.LockOwner
  154. // fmt.Printf("lock %s owner: %s\n", lock.key, lock.owner)
  155. } else {
  156. // fmt.Printf("lock %s has no owner\n", lock.key)
  157. lock.owner = ""
  158. }
  159. }
  160. return err
  161. })
  162. return
  163. }
  164. func (lock *LiveLock) LockOwner() string {
  165. return lock.owner
  166. }