Просмотр исходного кода

properly cancel context for streaming grpc

Chris Lu 4 лет назад
Родитель
Сommit
daf0a449f7

+ 3 - 1
unmaintained/diff_volume_servers/diff_volume_servers.go

@@ -124,7 +124,9 @@ type needleState struct {
 func getVolumeFiles(v uint32, addr string) (map[types.NeedleId]needleState, int64, error) {
 	var idxFile *bytes.Reader
 	err := operation.WithVolumeServerClient(addr, grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
-		copyFileClient, err := vs.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+		copyFileClient, err := vs.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
 			VolumeId:           v,
 			Ext:                ".idx",
 			CompactionRevision: math.MaxUint32,

+ 3 - 1
weed/filer/meta_aggregator.go

@@ -117,7 +117,9 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
 
 	for {
 		err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-			stream, err := client.SubscribeLocalMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{
+			ctx, cancel := context.WithCancel(context.Background())
+			defer cancel()
+			stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
 				ClientName: "filer:" + self,
 				PathPrefix: "/",
 				SinceNs:    lastTsNs,

+ 3 - 1
weed/filesys/dir_rename.go

@@ -29,6 +29,8 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
 
 	// update remote filer
 	err = dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
 
 		request := &filer_pb.AtomicRenameEntryRequest{
 			OldDirectory: dir.FullPath(),
@@ -37,7 +39,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
 			NewName:      req.NewName,
 		}
 
-		_, err := client.AtomicRenameEntry(context.Background(), request)
+		_, err := client.AtomicRenameEntry(ctx, request)
 		if err != nil {
 			return fuse.EIO
 		}

+ 3 - 1
weed/messaging/broker/broker_server.go

@@ -48,7 +48,9 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
 	for {
 		for _, filer := range broker.option.Filers {
 			broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
-				stream, err := client.KeepConnected(context.Background())
+				ctx, cancel := context.WithCancel(context.Background())
+				defer cancel()
+				stream, err := client.KeepConnected(ctx)
 				if err != nil {
 					glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
 					return err

+ 3 - 1
weed/operation/tail_volume.go

@@ -28,8 +28,10 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
 
 func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
 	return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
 
-		stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
+		stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
 			VolumeId:           uint32(vid),
 			SinceNs:            sinceNs,
 			IdleTimeoutSeconds: uint32(idleTimeoutSeconds),

+ 1 - 1
weed/pb/filer_pb/filer_client.go

@@ -85,11 +85,11 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f
 
 		glog.V(4).Infof("read directory: %v", request)
 		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
 		stream, err := client.ListEntries(ctx, request)
 		if err != nil {
 			return fmt.Errorf("list %s: %v", fullDirPath, err)
 		}
-		defer cancel()
 
 		var prevEntry *Entry
 		for {

+ 3 - 1
weed/s3api/s3api_objects_list_handlers.go

@@ -212,7 +212,9 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
 		InclusiveStartFrom: false,
 	}
 
-	stream, listErr := client.ListEntries(context.Background(), request)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	stream, listErr := client.ListEntries(ctx, request)
 	if listErr != nil {
 		err = fmt.Errorf("list entires %+v: %v", request, listErr)
 		return

+ 5 - 2
weed/server/volume_grpc_client_to_master.go

@@ -58,14 +58,17 @@ func (vs *VolumeServer) heartbeat() {
 
 func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
 
-	grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
 	if err != nil {
 		return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
 	}
 	defer grpcConection.Close()
 
 	client := master_pb.NewSeaweedClient(grpcConection)
-	stream, err := client.SendHeartbeat(context.Background())
+	stream, err := client.SendHeartbeat(ctx)
 	if err != nil {
 		glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
 		return "", err

+ 10 - 3
weed/wdclient/exclusive_locks/exclusive_locker.go

@@ -46,10 +46,13 @@ func (l *ExclusiveLocker) RequestLock() {
 		return
 	}
 
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	// retry to get the lease
 	for {
 		if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
-			resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+			resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
 				PreviousToken:    atomic.LoadInt64(&l.token),
 				PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
 				LockName:         AdminLockName,
@@ -73,7 +76,7 @@ func (l *ExclusiveLocker) RequestLock() {
 	go func() {
 		for l.isLocking {
 			if err := l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
-				resp, err := client.LeaseAdminToken(context.Background(), &master_pb.LeaseAdminTokenRequest{
+				resp, err := client.LeaseAdminToken(ctx, &master_pb.LeaseAdminTokenRequest{
 					PreviousToken:    atomic.LoadInt64(&l.token),
 					PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
 					LockName:         AdminLockName,
@@ -98,8 +101,12 @@ func (l *ExclusiveLocker) RequestLock() {
 
 func (l *ExclusiveLocker) ReleaseLock() {
 	l.isLocking = false
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
 	l.masterClient.WithClient(func(client master_pb.SeaweedClient) error {
-		client.ReleaseAdminToken(context.Background(), &master_pb.ReleaseAdminTokenRequest{
+		client.ReleaseAdminToken(ctx, &master_pb.ReleaseAdminTokenRequest{
 			PreviousToken:    atomic.LoadInt64(&l.token),
 			PreviousLockTime: atomic.LoadInt64(&l.lockTsNs),
 			LockName:         AdminLockName,

+ 4 - 1
weed/wdclient/masterclient.go

@@ -70,7 +70,10 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
 	glog.V(1).Infof("%s masterClient Connecting to master %v", mc.clientType, master)
 	gprcErr := pb.WithMasterClient(master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
 
-		stream, err := client.KeepConnected(context.Background())
+		ctx, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
+		stream, err := client.KeepConnected(ctx)
 		if err != nil {
 			glog.V(0).Infof("%s masterClient failed to keep connected to %s: %v", mc.clientType, master, err)
 			return err