data_node.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package topology
  2. import (
  3. "fmt"
  4. "strconv"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/storage"
  7. )
  8. type DataNode struct {
  9. NodeImpl
  10. volumes map[storage.VolumeId]storage.VolumeInfo
  11. Ip string
  12. Port int
  13. PublicUrl string
  14. LastSeen int64 // unix time in seconds
  15. }
  16. func NewDataNode(id string) *DataNode {
  17. s := &DataNode{}
  18. s.id = NodeId(id)
  19. s.nodeType = "DataNode"
  20. s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
  21. s.NodeImpl.value = s
  22. return s
  23. }
  24. func (dn *DataNode) String() string {
  25. dn.RLock()
  26. defer dn.RUnlock()
  27. return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
  28. }
  29. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
  30. dn.Lock()
  31. defer dn.Unlock()
  32. if _, ok := dn.volumes[v.Id]; !ok {
  33. dn.volumes[v.Id] = v
  34. dn.UpAdjustVolumeCountDelta(1)
  35. if !v.ReadOnly {
  36. dn.UpAdjustActiveVolumeCountDelta(1)
  37. }
  38. dn.UpAdjustMaxVolumeId(v.Id)
  39. } else {
  40. dn.volumes[v.Id] = v
  41. }
  42. }
  43. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
  44. actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
  45. for _, v := range actualVolumes {
  46. actualVolumeMap[v.Id] = v
  47. }
  48. dn.Lock()
  49. for vid, v := range dn.volumes {
  50. if _, ok := actualVolumeMap[vid]; !ok {
  51. glog.V(0).Infoln("Deleting volume id:", vid)
  52. delete(dn.volumes, vid)
  53. deletedVolumes = append(deletedVolumes, v)
  54. dn.UpAdjustVolumeCountDelta(-1)
  55. dn.UpAdjustActiveVolumeCountDelta(-1)
  56. }
  57. }
  58. dn.Unlock()
  59. for _, v := range actualVolumes {
  60. dn.AddOrUpdateVolume(v)
  61. }
  62. return
  63. }
  64. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  65. dn.RLock()
  66. for _, v := range dn.volumes {
  67. ret = append(ret, v)
  68. }
  69. dn.RUnlock()
  70. return ret
  71. }
  72. func (dn *DataNode) GetVolumesById(id storage.VolumeId) (storage.VolumeInfo, error) {
  73. dn.RLock()
  74. defer dn.RUnlock()
  75. v_info, ok := dn.volumes[id]
  76. if ok {
  77. return v_info, nil
  78. } else {
  79. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  80. }
  81. }
  82. func (dn *DataNode) GetDataCenter() *DataCenter {
  83. return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
  84. }
  85. func (dn *DataNode) GetRack() *Rack {
  86. return dn.Parent().(*NodeImpl).value.(*Rack)
  87. }
  88. func (dn *DataNode) GetTopology() *Topology {
  89. p := dn.Parent()
  90. for p.Parent() != nil {
  91. p = p.Parent()
  92. }
  93. t := p.(*Topology)
  94. return t
  95. }
  96. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  97. return dn.Ip == ip && dn.Port == port
  98. }
  99. func (dn *DataNode) Url() string {
  100. return dn.Ip + ":" + strconv.Itoa(dn.Port)
  101. }
  102. func (dn *DataNode) ToMap() interface{} {
  103. ret := make(map[string]interface{})
  104. ret["Url"] = dn.Url()
  105. ret["Volumes"] = dn.GetVolumeCount()
  106. ret["Max"] = dn.GetMaxVolumeCount()
  107. ret["Free"] = dn.FreeSpace()
  108. ret["PublicUrl"] = dn.PublicUrl
  109. return ret
  110. }