master_grpc_server.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package weed_server
  2. import (
  3. "net"
  4. "strings"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/topology"
  9. "google.golang.org/grpc/peer"
  10. )
  11. func (ms MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
  12. var dn *topology.DataNode
  13. t := ms.Topo
  14. for {
  15. heartbeat, err := stream.Recv()
  16. if err == nil {
  17. if dn == nil {
  18. t.Sequence.SetMax(heartbeat.MaxFileKey)
  19. if heartbeat.Ip == "" {
  20. if pr, ok := peer.FromContext(stream.Context()); ok {
  21. if pr.Addr != net.Addr(nil) {
  22. heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
  23. glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
  24. }
  25. }
  26. }
  27. dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
  28. dc := t.GetOrCreateDataCenter(dcName)
  29. rack := dc.GetOrCreateRack(rackName)
  30. dn = rack.GetOrCreateDataNode(heartbeat.Ip,
  31. int(heartbeat.Port), heartbeat.PublicUrl,
  32. int(heartbeat.MaxVolumeCount))
  33. glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
  34. if err := stream.Send(&master_pb.HeartbeatResponse{
  35. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  36. SecretKey: string(ms.guard.SecretKey),
  37. }); err != nil {
  38. return err
  39. }
  40. }
  41. var volumeInfos []storage.VolumeInfo
  42. for _, v := range heartbeat.Volumes {
  43. if vi, err := storage.NewVolumeInfo(v); err == nil {
  44. volumeInfos = append(volumeInfos, vi)
  45. } else {
  46. glog.V(0).Infof("Fail to convert joined volume information: %v", err)
  47. }
  48. }
  49. deletedVolumes := dn.UpdateVolumes(volumeInfos)
  50. for _, v := range volumeInfos {
  51. t.RegisterVolumeLayout(v, dn)
  52. }
  53. for _, v := range deletedVolumes {
  54. t.UnRegisterVolumeLayout(v, dn)
  55. }
  56. } else {
  57. if dn != nil {
  58. glog.V(0).Infof("lost volume server %s:%d", dn.Ip, dn.Port)
  59. t.UnRegisterDataNode(dn)
  60. }
  61. return err
  62. }
  63. // tell the volume servers about the leader
  64. newLeader, err := t.Leader()
  65. if err == nil {
  66. if err := stream.Send(&master_pb.HeartbeatResponse{
  67. Leader: newLeader,
  68. }); err != nil {
  69. return err
  70. }
  71. }
  72. }
  73. }