data_node.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "sync/atomic"
  12. )
  13. type DataNode struct {
  14. NodeImpl
  15. Ip string
  16. Port int
  17. GrpcPort int
  18. PublicUrl string
  19. LastSeen int64 // unix time in seconds
  20. Counter int // in race condition, the previous dataNode was not dead
  21. }
  22. func NewDataNode(id string) *DataNode {
  23. dn := &DataNode{}
  24. dn.id = NodeId(id)
  25. dn.nodeType = "DataNode"
  26. dn.diskUsages = newDiskUsages()
  27. dn.children = make(map[NodeId]Node)
  28. dn.NodeImpl.value = dn
  29. return dn
  30. }
  31. func (dn *DataNode) String() string {
  32. dn.RLock()
  33. defer dn.RUnlock()
  34. return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
  35. }
  36. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  37. dn.Lock()
  38. defer dn.Unlock()
  39. return dn.doAddOrUpdateVolume(v)
  40. }
  41. func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
  42. c, found := dn.children[NodeId(diskType)]
  43. if !found {
  44. c = NewDisk(diskType)
  45. dn.doLinkChildNode(c)
  46. }
  47. disk := c.(*Disk)
  48. return disk
  49. }
  50. func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
  51. disk := dn.getOrCreateDisk(v.DiskType)
  52. return disk.AddOrUpdateVolume(v)
  53. }
  54. // UpdateVolumes detects new/deleted/changed volumes on a volume server
  55. // used in master to notify master clients of these changes.
  56. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changedVolumes []storage.VolumeInfo) {
  57. actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
  58. for _, v := range actualVolumes {
  59. actualVolumeMap[v.Id] = v
  60. }
  61. dn.Lock()
  62. defer dn.Unlock()
  63. existingVolumes := dn.getVolumes()
  64. for _, v := range existingVolumes {
  65. vid := v.Id
  66. if _, ok := actualVolumeMap[vid]; !ok {
  67. glog.V(0).Infoln("Deleting volume id:", vid)
  68. disk := dn.getOrCreateDisk(v.DiskType)
  69. delete(disk.volumes, vid)
  70. deletedVolumes = append(deletedVolumes, v)
  71. deltaDiskUsages := newDiskUsages()
  72. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  73. deltaDiskUsage.volumeCount = -1
  74. if v.IsRemote() {
  75. deltaDiskUsage.remoteVolumeCount = -1
  76. }
  77. if !v.ReadOnly {
  78. deltaDiskUsage.activeVolumeCount = -1
  79. }
  80. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  81. }
  82. }
  83. for _, v := range actualVolumes {
  84. isNew, isChanged := dn.doAddOrUpdateVolume(v)
  85. if isNew {
  86. newVolumes = append(newVolumes, v)
  87. }
  88. if isChanged {
  89. changedVolumes = append(changedVolumes, v)
  90. }
  91. }
  92. return
  93. }
  94. func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
  95. dn.Lock()
  96. defer dn.Unlock()
  97. for _, v := range deletedVolumes {
  98. disk := dn.getOrCreateDisk(v.DiskType)
  99. if _, found := disk.volumes[v.Id]; !found {
  100. continue
  101. }
  102. delete(disk.volumes, v.Id)
  103. deltaDiskUsages := newDiskUsages()
  104. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  105. deltaDiskUsage.volumeCount = -1
  106. if v.IsRemote() {
  107. deltaDiskUsage.remoteVolumeCount = -1
  108. }
  109. if !v.ReadOnly {
  110. deltaDiskUsage.activeVolumeCount = -1
  111. }
  112. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  113. }
  114. for _, v := range newVolumes {
  115. dn.doAddOrUpdateVolume(v)
  116. }
  117. return
  118. }
  119. func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
  120. deltaDiskUsages := newDiskUsages()
  121. for diskType, maxVolumeCount := range maxVolumeCounts {
  122. if maxVolumeCount == 0 {
  123. // the volume server may have set the max to zero
  124. continue
  125. }
  126. dt := types.ToDiskType(diskType)
  127. currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
  128. currentDiskUsageMaxVolumeCount := atomic.LoadInt64(&currentDiskUsage.maxVolumeCount)
  129. if currentDiskUsageMaxVolumeCount == int64(maxVolumeCount) {
  130. continue
  131. }
  132. disk := dn.getOrCreateDisk(dt.String())
  133. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
  134. deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount
  135. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  136. }
  137. }
  138. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  139. dn.RLock()
  140. for _, c := range dn.children {
  141. disk := c.(*Disk)
  142. ret = append(ret, disk.GetVolumes()...)
  143. }
  144. dn.RUnlock()
  145. return ret
  146. }
  147. func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
  148. dn.RLock()
  149. defer dn.RUnlock()
  150. found := false
  151. for _, c := range dn.children {
  152. disk := c.(*Disk)
  153. vInfo, found = disk.volumes[id]
  154. if found {
  155. break
  156. }
  157. }
  158. if found {
  159. return vInfo, nil
  160. } else {
  161. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  162. }
  163. }
  164. func (dn *DataNode) GetDataCenter() *DataCenter {
  165. rack := dn.Parent()
  166. if rack == nil {
  167. return nil
  168. }
  169. dcNode := rack.Parent()
  170. if dcNode == nil {
  171. return nil
  172. }
  173. dcValue := dcNode.GetValue()
  174. return dcValue.(*DataCenter)
  175. }
  176. func (dn *DataNode) GetDataCenterId() string {
  177. if dc := dn.GetDataCenter(); dc != nil {
  178. return string(dc.Id())
  179. }
  180. return ""
  181. }
  182. func (dn *DataNode) GetRack() *Rack {
  183. return dn.Parent().(*NodeImpl).value.(*Rack)
  184. }
  185. func (dn *DataNode) GetTopology() *Topology {
  186. p := dn.Parent()
  187. for p.Parent() != nil {
  188. p = p.Parent()
  189. }
  190. t := p.(*Topology)
  191. return t
  192. }
  193. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  194. return dn.Ip == ip && dn.Port == port
  195. }
  196. func (dn *DataNode) Url() string {
  197. return util.JoinHostPort(dn.Ip, dn.Port)
  198. }
  199. func (dn *DataNode) ServerAddress() pb.ServerAddress {
  200. return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort)
  201. }
  202. type DataNodeInfo struct {
  203. Url string `json:"Url"`
  204. PublicUrl string `json:"PublicUrl"`
  205. Volumes int64 `json:"Volumes"`
  206. EcShards int64 `json:"EcShards"`
  207. Max int64 `json:"Max"`
  208. VolumeIds string `json:"VolumeIds"`
  209. }
  210. func (dn *DataNode) ToInfo() (info DataNodeInfo) {
  211. info.Url = dn.Url()
  212. info.PublicUrl = dn.PublicUrl
  213. // aggregated volume info
  214. var volumeCount, ecShardCount, maxVolumeCount int64
  215. var volumeIds string
  216. for _, diskUsage := range dn.diskUsages.usages {
  217. volumeCount += diskUsage.volumeCount
  218. ecShardCount += diskUsage.ecShardCount
  219. maxVolumeCount += diskUsage.maxVolumeCount
  220. }
  221. for _, disk := range dn.Children() {
  222. d := disk.(*Disk)
  223. volumeIds += " " + d.GetVolumeIds()
  224. }
  225. info.Volumes = volumeCount
  226. info.EcShards = ecShardCount
  227. info.Max = maxVolumeCount
  228. info.VolumeIds = volumeIds
  229. return
  230. }
  231. func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
  232. m := &master_pb.DataNodeInfo{
  233. Id: string(dn.Id()),
  234. DiskInfos: make(map[string]*master_pb.DiskInfo),
  235. GrpcPort: uint32(dn.GrpcPort),
  236. }
  237. for _, c := range dn.Children() {
  238. disk := c.(*Disk)
  239. m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
  240. }
  241. return m
  242. }
  243. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  244. func (dn *DataNode) GetVolumeIds() string {
  245. dn.RLock()
  246. defer dn.RUnlock()
  247. existingVolumes := dn.getVolumes()
  248. ids := make([]int, 0, len(existingVolumes))
  249. for k := range existingVolumes {
  250. ids = append(ids, int(k))
  251. }
  252. return util.HumanReadableIntsMax(100, ids...)
  253. }
  254. func (dn *DataNode) getVolumes() []storage.VolumeInfo {
  255. var existingVolumes []storage.VolumeInfo
  256. for _, c := range dn.children {
  257. disk := c.(*Disk)
  258. existingVolumes = append(existingVolumes, disk.GetVolumes()...)
  259. }
  260. return existingVolumes
  261. }