data_node.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "strconv"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/storage"
  11. )
  12. type DataNode struct {
  13. NodeImpl
  14. Ip string
  15. Port int
  16. PublicUrl string
  17. LastSeen int64 // unix time in seconds
  18. }
  19. func NewDataNode(id string) *DataNode {
  20. dn := &DataNode{}
  21. dn.id = NodeId(id)
  22. dn.nodeType = "DataNode"
  23. dn.diskUsages = newDiskUsages()
  24. dn.children = make(map[NodeId]Node)
  25. dn.NodeImpl.value = dn
  26. return dn
  27. }
  28. func (dn *DataNode) String() string {
  29. dn.RLock()
  30. defer dn.RUnlock()
  31. return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
  32. }
  33. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  34. dn.Lock()
  35. defer dn.Unlock()
  36. return dn.doAddOrUpdateVolume(v)
  37. }
  38. func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
  39. c, found := dn.children[NodeId(diskType)]
  40. if !found {
  41. c = NewDisk(diskType)
  42. dn.doLinkChildNode(c)
  43. }
  44. disk := c.(*Disk)
  45. return disk
  46. }
  47. func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  48. disk := dn.getOrCreateDisk(v.DiskType)
  49. return disk.AddOrUpdateVolume(v)
  50. }
  51. // UpdateVolumes detects new/deleted/changed volumes on a volume server
  52. // used in master to notify master clients of these changes.
  53. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
  54. actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
  55. for _, v := range actualVolumes {
  56. actualVolumeMap[v.Id] = v
  57. }
  58. dn.Lock()
  59. defer dn.Unlock()
  60. existingVolumes := dn.getVolumes()
  61. for _, v := range existingVolumes {
  62. vid := v.Id
  63. if _, ok := actualVolumeMap[vid]; !ok {
  64. glog.V(0).Infoln("Deleting volume id:", vid)
  65. disk := dn.getOrCreateDisk(v.DiskType)
  66. delete(disk.volumes, vid)
  67. deletedVolumes = append(deletedVolumes, v)
  68. deltaDiskUsages := newDiskUsages()
  69. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  70. deltaDiskUsage.volumeCount = -1
  71. if v.IsRemote() {
  72. deltaDiskUsage.remoteVolumeCount = -1
  73. }
  74. if !v.ReadOnly {
  75. deltaDiskUsage.activeVolumeCount = -1
  76. }
  77. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  78. }
  79. }
  80. for _, v := range actualVolumes {
  81. isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
  82. if isNew {
  83. newVolumes = append(newVolumes, v)
  84. }
  85. if isChangedRO {
  86. changeRO = append(changeRO, v)
  87. }
  88. }
  89. return
  90. }
  91. func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
  92. dn.Lock()
  93. defer dn.Unlock()
  94. for _, v := range deletedVolumes {
  95. disk := dn.getOrCreateDisk(v.DiskType)
  96. delete(disk.volumes, v.Id)
  97. deltaDiskUsages := newDiskUsages()
  98. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  99. deltaDiskUsage.volumeCount = -1
  100. if v.IsRemote() {
  101. deltaDiskUsage.remoteVolumeCount = -1
  102. }
  103. if !v.ReadOnly {
  104. deltaDiskUsage.activeVolumeCount = -1
  105. }
  106. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  107. }
  108. for _, v := range newVolumes {
  109. dn.doAddOrUpdateVolume(v)
  110. }
  111. return
  112. }
  113. func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
  114. deltaDiskUsages := newDiskUsages()
  115. for diskType, maxVolumeCount := range maxVolumeCounts {
  116. if maxVolumeCount == 0 {
  117. // the volume server may have set the max to zero
  118. continue
  119. }
  120. dt := types.ToDiskType(diskType)
  121. currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
  122. if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) {
  123. continue
  124. }
  125. disk := dn.getOrCreateDisk(dt.String())
  126. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
  127. deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
  128. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  129. }
  130. }
  131. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  132. dn.RLock()
  133. for _, c := range dn.children {
  134. disk := c.(*Disk)
  135. ret = append(ret, disk.GetVolumes()...)
  136. }
  137. dn.RUnlock()
  138. return ret
  139. }
  140. func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
  141. dn.RLock()
  142. defer dn.RUnlock()
  143. found := false
  144. for _, c := range dn.children {
  145. disk := c.(*Disk)
  146. vInfo, found = disk.volumes[id]
  147. if found {
  148. break
  149. }
  150. }
  151. if found {
  152. return vInfo, nil
  153. } else {
  154. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  155. }
  156. }
  157. func (dn *DataNode) GetDataCenter() *DataCenter {
  158. rack := dn.Parent()
  159. if rack == nil {
  160. return nil
  161. }
  162. dcNode := rack.Parent()
  163. if dcNode == nil {
  164. return nil
  165. }
  166. dcValue := dcNode.GetValue()
  167. return dcValue.(*DataCenter)
  168. }
  169. func (dn *DataNode) GetRack() *Rack {
  170. return dn.Parent().(*NodeImpl).value.(*Rack)
  171. }
  172. func (dn *DataNode) GetTopology() *Topology {
  173. p := dn.Parent()
  174. for p.Parent() != nil {
  175. p = p.Parent()
  176. }
  177. t := p.(*Topology)
  178. return t
  179. }
  180. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  181. return dn.Ip == ip && dn.Port == port
  182. }
  183. func (dn *DataNode) Url() string {
  184. return dn.Ip + ":" + strconv.Itoa(dn.Port)
  185. }
  186. func (dn *DataNode) ToMap() interface{} {
  187. ret := make(map[string]interface{})
  188. ret["Url"] = dn.Url()
  189. ret["PublicUrl"] = dn.PublicUrl
  190. // aggregated volume info
  191. var volumeCount, ecShardCount, maxVolumeCount int64
  192. var volumeIds string
  193. for _, diskUsage := range dn.diskUsages.usages {
  194. volumeCount += diskUsage.volumeCount
  195. ecShardCount += diskUsage.ecShardCount
  196. maxVolumeCount += diskUsage.maxVolumeCount
  197. }
  198. for _, disk := range dn.Children() {
  199. d := disk.(*Disk)
  200. volumeIds += " " + d.GetVolumeIds()
  201. }
  202. ret["Volumes"] = volumeCount
  203. ret["EcShards"] = ecShardCount
  204. ret["Max"] = maxVolumeCount
  205. ret["VolumeIds"] = volumeIds
  206. return ret
  207. }
  208. func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
  209. m := &master_pb.DataNodeInfo{
  210. Id: string(dn.Id()),
  211. DiskInfos: make(map[string]*master_pb.DiskInfo),
  212. }
  213. for _, c := range dn.Children() {
  214. disk := c.(*Disk)
  215. m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
  216. }
  217. return m
  218. }
  219. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  220. func (dn *DataNode) GetVolumeIds() string {
  221. dn.RLock()
  222. defer dn.RUnlock()
  223. existingVolumes := dn.getVolumes()
  224. ids := make([]int, 0, len(existingVolumes))
  225. for k := range existingVolumes {
  226. ids = append(ids, int(k))
  227. }
  228. return util.HumanReadableIntsMax(100, ids...)
  229. }
  230. func (dn *DataNode) getVolumes() []storage.VolumeInfo {
  231. var existingVolumes []storage.VolumeInfo
  232. for _, c := range dn.children {
  233. disk := c.(*Disk)
  234. existingVolumes = append(existingVolumes, disk.GetVolumes()...)
  235. }
  236. return existingVolumes
  237. }