filer_grpc_server_dlm.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "google.golang.org/grpc/codes"
  9. "google.golang.org/grpc/status"
  10. "time"
  11. )
  12. // DistributedLock is a grpc handler to handle FilerServer's LockRequest
  13. func (fs *FilerServer) DistributedLock(ctx context.Context, req *filer_pb.LockRequest) (resp *filer_pb.LockResponse, err error) {
  14. resp = &filer_pb.LockResponse{}
  15. var movedTo pb.ServerAddress
  16. expiredAtNs := time.Now().Add(time.Duration(req.SecondsToLock) * time.Second).UnixNano()
  17. resp.RenewToken, movedTo, err = fs.filer.Dlm.LockWithTimeout(req.Name, expiredAtNs, req.RenewToken, req.Owner)
  18. glog.V(3).Infof("lock %s %v %v %v, isMoved=%v %v", req.Name, req.SecondsToLock, req.RenewToken, req.Owner, req.IsMoved, movedTo)
  19. if movedTo != "" && movedTo != fs.option.Host && !req.IsMoved {
  20. err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  21. secondResp, err := client.DistributedLock(context.Background(), &filer_pb.LockRequest{
  22. Name: req.Name,
  23. SecondsToLock: req.SecondsToLock,
  24. RenewToken: req.RenewToken,
  25. IsMoved: true,
  26. Owner: req.Owner,
  27. })
  28. if err == nil {
  29. resp.RenewToken = secondResp.RenewToken
  30. } else {
  31. resp.Error = secondResp.Error
  32. }
  33. return err
  34. })
  35. }
  36. if err != nil {
  37. resp.Error = fmt.Sprintf("%v", err)
  38. }
  39. if movedTo != "" {
  40. resp.MovedTo = string(movedTo)
  41. }
  42. return resp, nil
  43. }
  44. // Unlock is a grpc handler to handle FilerServer's UnlockRequest
  45. func (fs *FilerServer) DistributedUnlock(ctx context.Context, req *filer_pb.UnlockRequest) (resp *filer_pb.UnlockResponse, err error) {
  46. resp = &filer_pb.UnlockResponse{}
  47. var movedTo pb.ServerAddress
  48. movedTo, err = fs.filer.Dlm.Unlock(req.Name, req.RenewToken)
  49. if !req.IsMoved && movedTo != "" {
  50. err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  51. secondResp, err := client.DistributedUnlock(context.Background(), &filer_pb.UnlockRequest{
  52. Name: req.Name,
  53. RenewToken: req.RenewToken,
  54. IsMoved: true,
  55. })
  56. resp.Error = secondResp.Error
  57. return err
  58. })
  59. }
  60. if err != nil {
  61. resp.Error = fmt.Sprintf("%v", err)
  62. }
  63. if movedTo != "" {
  64. resp.MovedTo = string(movedTo)
  65. }
  66. return resp, nil
  67. }
  68. func (fs *FilerServer) FindLockOwner(ctx context.Context, req *filer_pb.FindLockOwnerRequest) (*filer_pb.FindLockOwnerResponse, error) {
  69. owner, movedTo, err := fs.filer.Dlm.FindLockOwner(req.Name)
  70. if err != nil {
  71. return nil, status.Error(codes.Internal, err.Error())
  72. }
  73. if !req.IsMoved && movedTo != "" {
  74. err = pb.WithFilerClient(false, 0, movedTo, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  75. secondResp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
  76. Name: req.Name,
  77. IsMoved: true,
  78. })
  79. if err != nil {
  80. return err
  81. }
  82. owner = secondResp.Owner
  83. return nil
  84. })
  85. if err != nil {
  86. return nil, err
  87. }
  88. }
  89. return &filer_pb.FindLockOwnerResponse{
  90. Owner: owner,
  91. }, nil
  92. }
  93. // TransferLocks is a grpc handler to handle FilerServer's TransferLocksRequest
  94. func (fs *FilerServer) TransferLocks(ctx context.Context, req *filer_pb.TransferLocksRequest) (*filer_pb.TransferLocksResponse, error) {
  95. for _, lock := range req.Locks {
  96. fs.filer.Dlm.InsertLock(lock.Name, lock.ExpiredAtNs, lock.RenewToken, lock.Owner)
  97. }
  98. return &filer_pb.TransferLocksResponse{}, nil
  99. }
  100. func (fs *FilerServer) OnDlmChangeSnapshot(snapshot []pb.ServerAddress) {
  101. locks := fs.filer.Dlm.SelectNotOwnedLocks(snapshot)
  102. if len(locks) == 0 {
  103. return
  104. }
  105. for _, lock := range locks {
  106. server := fs.filer.Dlm.CalculateTargetServer(lock.Key, snapshot)
  107. if err := pb.WithFilerClient(false, 0, server, fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  108. _, err := client.TransferLocks(context.Background(), &filer_pb.TransferLocksRequest{
  109. Locks: []*filer_pb.Lock{
  110. {
  111. Name: lock.Key,
  112. RenewToken: lock.Token,
  113. ExpiredAtNs: lock.ExpiredAtNs,
  114. Owner: lock.Owner,
  115. },
  116. },
  117. })
  118. return err
  119. }); err != nil {
  120. // it may not be worth retrying, since the lock may have expired
  121. glog.Errorf("transfer lock %v to %v: %v", lock.Key, server, err)
  122. }
  123. }
  124. }