data_node.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/util"
  5. "strconv"
  6. "sync"
  7. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/util/log"
  11. "github.com/chrislusf/seaweedfs/weed/storage"
  12. )
  13. type DataNode struct {
  14. NodeImpl
  15. volumes map[needle.VolumeId]storage.VolumeInfo
  16. Ip string
  17. Port int
  18. PublicUrl string
  19. LastSeen int64 // unix time in seconds
  20. ecShards map[needle.VolumeId]*erasure_coding.EcVolumeInfo
  21. ecShardsLock sync.RWMutex
  22. }
  23. func NewDataNode(id string) *DataNode {
  24. s := &DataNode{}
  25. s.id = NodeId(id)
  26. s.nodeType = "DataNode"
  27. s.volumes = make(map[needle.VolumeId]storage.VolumeInfo)
  28. s.ecShards = make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  29. s.NodeImpl.value = s
  30. return s
  31. }
  32. func (dn *DataNode) String() string {
  33. dn.RLock()
  34. defer dn.RUnlock()
  35. return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl)
  36. }
  37. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  38. dn.Lock()
  39. defer dn.Unlock()
  40. return dn.doAddOrUpdateVolume(v)
  41. }
  42. func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  43. if oldV, ok := dn.volumes[v.Id]; !ok {
  44. dn.volumes[v.Id] = v
  45. dn.UpAdjustVolumeCountDelta(1)
  46. if v.IsRemote() {
  47. dn.UpAdjustRemoteVolumeCountDelta(1)
  48. }
  49. if !v.ReadOnly {
  50. dn.UpAdjustActiveVolumeCountDelta(1)
  51. }
  52. dn.UpAdjustMaxVolumeId(v.Id)
  53. isNew = true
  54. } else {
  55. if oldV.IsRemote() != v.IsRemote() {
  56. if v.IsRemote() {
  57. dn.UpAdjustRemoteVolumeCountDelta(1)
  58. }
  59. if oldV.IsRemote() {
  60. dn.UpAdjustRemoteVolumeCountDelta(-1)
  61. }
  62. }
  63. isChangedRO = dn.volumes[v.Id].ReadOnly != v.ReadOnly
  64. dn.volumes[v.Id] = v
  65. }
  66. return
  67. }
  68. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
  69. actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
  70. for _, v := range actualVolumes {
  71. actualVolumeMap[v.Id] = v
  72. }
  73. dn.Lock()
  74. defer dn.Unlock()
  75. for vid, v := range dn.volumes {
  76. if _, ok := actualVolumeMap[vid]; !ok {
  77. log.Infoln("Deleting volume id:", vid)
  78. delete(dn.volumes, vid)
  79. deletedVolumes = append(deletedVolumes, v)
  80. dn.UpAdjustVolumeCountDelta(-1)
  81. if v.IsRemote() {
  82. dn.UpAdjustRemoteVolumeCountDelta(-1)
  83. }
  84. if !v.ReadOnly {
  85. dn.UpAdjustActiveVolumeCountDelta(-1)
  86. }
  87. }
  88. }
  89. for _, v := range actualVolumes {
  90. isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
  91. if isNew {
  92. newVolumes = append(newVolumes, v)
  93. }
  94. if isChangedRO {
  95. changeRO = append(changeRO, v)
  96. }
  97. }
  98. return
  99. }
  100. func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
  101. dn.Lock()
  102. defer dn.Unlock()
  103. for _, v := range deletedVolumes {
  104. delete(dn.volumes, v.Id)
  105. dn.UpAdjustVolumeCountDelta(-1)
  106. if v.IsRemote() {
  107. dn.UpAdjustRemoteVolumeCountDelta(-1)
  108. }
  109. if !v.ReadOnly {
  110. dn.UpAdjustActiveVolumeCountDelta(-1)
  111. }
  112. }
  113. for _, v := range newVolumes {
  114. dn.doAddOrUpdateVolume(v)
  115. }
  116. return
  117. }
  118. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  119. dn.RLock()
  120. for _, v := range dn.volumes {
  121. ret = append(ret, v)
  122. }
  123. dn.RUnlock()
  124. return ret
  125. }
  126. func (dn *DataNode) GetVolumesById(id needle.VolumeId) (storage.VolumeInfo, error) {
  127. dn.RLock()
  128. defer dn.RUnlock()
  129. vInfo, ok := dn.volumes[id]
  130. if ok {
  131. return vInfo, nil
  132. } else {
  133. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  134. }
  135. }
  136. func (dn *DataNode) GetDataCenter() *DataCenter {
  137. return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
  138. }
  139. func (dn *DataNode) GetRack() *Rack {
  140. return dn.Parent().(*NodeImpl).value.(*Rack)
  141. }
  142. func (dn *DataNode) GetTopology() *Topology {
  143. p := dn.Parent()
  144. for p.Parent() != nil {
  145. p = p.Parent()
  146. }
  147. t := p.(*Topology)
  148. return t
  149. }
  150. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  151. return dn.Ip == ip && dn.Port == port
  152. }
  153. func (dn *DataNode) Url() string {
  154. return dn.Ip + ":" + strconv.Itoa(dn.Port)
  155. }
  156. func (dn *DataNode) ToMap() interface{} {
  157. ret := make(map[string]interface{})
  158. ret["Url"] = dn.Url()
  159. ret["Volumes"] = dn.GetVolumeCount()
  160. ret["VolumeIds"] = dn.GetVolumeIds()
  161. ret["EcShards"] = dn.GetEcShardCount()
  162. ret["Max"] = dn.GetMaxVolumeCount()
  163. ret["Free"] = dn.FreeSpace()
  164. ret["PublicUrl"] = dn.PublicUrl
  165. return ret
  166. }
  167. func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
  168. m := &master_pb.DataNodeInfo{
  169. Id: string(dn.Id()),
  170. VolumeCount: uint64(dn.GetVolumeCount()),
  171. MaxVolumeCount: uint64(dn.GetMaxVolumeCount()),
  172. FreeVolumeCount: uint64(dn.FreeSpace()),
  173. ActiveVolumeCount: uint64(dn.GetActiveVolumeCount()),
  174. RemoteVolumeCount: uint64(dn.GetRemoteVolumeCount()),
  175. }
  176. for _, v := range dn.GetVolumes() {
  177. m.VolumeInfos = append(m.VolumeInfos, v.ToVolumeInformationMessage())
  178. }
  179. for _, ecv := range dn.GetEcShards() {
  180. m.EcShardInfos = append(m.EcShardInfos, ecv.ToVolumeEcShardInformationMessage())
  181. }
  182. return m
  183. }
  184. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  185. func (dn *DataNode) GetVolumeIds() string {
  186. dn.RLock()
  187. defer dn.RUnlock()
  188. ids := make([]int, 0, len(dn.volumes))
  189. for k := range dn.volumes {
  190. ids = append(ids, int(k))
  191. }
  192. return util.HumanReadableIntsMax(100, ids...)
  193. }