data_node.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package topology
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. )
  13. type DataNode struct {
  14. NodeImpl
  15. Ip string
  16. Port int
  17. GrpcPort int
  18. PublicUrl string
  19. LastSeen int64 // unix time in seconds
  20. Counter int // in race condition, the previous dataNode was not dead
  21. IsTerminating bool
  22. }
  23. func NewDataNode(id string) *DataNode {
  24. dn := &DataNode{}
  25. dn.id = NodeId(id)
  26. dn.nodeType = "DataNode"
  27. dn.diskUsages = newDiskUsages()
  28. dn.children = make(map[NodeId]Node)
  29. dn.NodeImpl.value = dn
  30. return dn
  31. }
  32. func (dn *DataNode) String() string {
  33. dn.RLock()
  34. defer dn.RUnlock()
  35. return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
  36. }
  37. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  38. dn.Lock()
  39. defer dn.Unlock()
  40. return dn.doAddOrUpdateVolume(v)
  41. }
  42. func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
  43. c, found := dn.children[NodeId(diskType)]
  44. if !found {
  45. c = NewDisk(diskType)
  46. dn.doLinkChildNode(c)
  47. }
  48. disk := c.(*Disk)
  49. return disk
  50. }
  51. func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChanged bool) {
  52. disk := dn.getOrCreateDisk(v.DiskType)
  53. return disk.AddOrUpdateVolume(v)
  54. }
  55. // UpdateVolumes detects new/deleted/changed volumes on a volume server
  56. // used in master to notify master clients of these changes.
  57. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changedVolumes []storage.VolumeInfo) {
  58. actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
  59. for _, v := range actualVolumes {
  60. actualVolumeMap[v.Id] = v
  61. }
  62. dn.Lock()
  63. defer dn.Unlock()
  64. existingVolumes := dn.getVolumes()
  65. for _, v := range existingVolumes {
  66. vid := v.Id
  67. if _, ok := actualVolumeMap[vid]; !ok {
  68. glog.V(0).Infoln("Deleting volume id:", vid)
  69. disk := dn.getOrCreateDisk(v.DiskType)
  70. disk.DeleteVolumeById(vid)
  71. deletedVolumes = append(deletedVolumes, v)
  72. deltaDiskUsages := newDiskUsages()
  73. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  74. deltaDiskUsage.volumeCount = -1
  75. if v.IsRemote() {
  76. deltaDiskUsage.remoteVolumeCount = -1
  77. }
  78. if !v.ReadOnly {
  79. deltaDiskUsage.activeVolumeCount = -1
  80. }
  81. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  82. }
  83. }
  84. for _, v := range actualVolumes {
  85. isNew, isChanged := dn.doAddOrUpdateVolume(v)
  86. if isNew {
  87. newVolumes = append(newVolumes, v)
  88. }
  89. if isChanged {
  90. changedVolumes = append(changedVolumes, v)
  91. }
  92. }
  93. return
  94. }
  95. func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
  96. dn.Lock()
  97. defer dn.Unlock()
  98. for _, v := range deletedVolumes {
  99. disk := dn.getOrCreateDisk(v.DiskType)
  100. _, err := disk.GetVolumesById(v.Id)
  101. if err != nil {
  102. continue
  103. }
  104. disk.DeleteVolumeById(v.Id)
  105. deltaDiskUsages := newDiskUsages()
  106. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  107. deltaDiskUsage.volumeCount = -1
  108. if v.IsRemote() {
  109. deltaDiskUsage.remoteVolumeCount = -1
  110. }
  111. if !v.ReadOnly {
  112. deltaDiskUsage.activeVolumeCount = -1
  113. }
  114. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  115. }
  116. for _, v := range newVolumes {
  117. dn.doAddOrUpdateVolume(v)
  118. }
  119. return
  120. }
  121. func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
  122. for diskType, maxVolumeCount := range maxVolumeCounts {
  123. if maxVolumeCount == 0 {
  124. // the volume server may have set the max to zero
  125. continue
  126. }
  127. deltaDiskUsages := newDiskUsages()
  128. dt := types.ToDiskType(diskType)
  129. currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
  130. currentDiskUsageMaxVolumeCount := atomic.LoadInt64(&currentDiskUsage.maxVolumeCount)
  131. if currentDiskUsageMaxVolumeCount == int64(maxVolumeCount) {
  132. continue
  133. }
  134. disk := dn.getOrCreateDisk(dt.String())
  135. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
  136. deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsageMaxVolumeCount
  137. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  138. }
  139. }
  140. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  141. dn.RLock()
  142. for _, c := range dn.children {
  143. disk := c.(*Disk)
  144. ret = append(ret, disk.GetVolumes()...)
  145. }
  146. dn.RUnlock()
  147. return ret
  148. }
  149. func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
  150. dn.RLock()
  151. defer dn.RUnlock()
  152. found := false
  153. for _, c := range dn.children {
  154. disk := c.(*Disk)
  155. vInfo, err = disk.GetVolumesById(id)
  156. if err == nil {
  157. found = true
  158. break
  159. }
  160. }
  161. if found {
  162. return vInfo, nil
  163. } else {
  164. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  165. }
  166. }
  167. func (dn *DataNode) GetDataCenter() *DataCenter {
  168. rack := dn.Parent()
  169. if rack == nil {
  170. return nil
  171. }
  172. dcNode := rack.Parent()
  173. if dcNode == nil {
  174. return nil
  175. }
  176. dcValue := dcNode.GetValue()
  177. return dcValue.(*DataCenter)
  178. }
  179. func (dn *DataNode) GetDataCenterId() string {
  180. if dc := dn.GetDataCenter(); dc != nil {
  181. return string(dc.Id())
  182. }
  183. return ""
  184. }
  185. func (dn *DataNode) GetRack() *Rack {
  186. return dn.Parent().(*NodeImpl).value.(*Rack)
  187. }
  188. func (dn *DataNode) GetTopology() *Topology {
  189. p := dn.Parent()
  190. for p.Parent() != nil {
  191. p = p.Parent()
  192. }
  193. t := p.(*Topology)
  194. return t
  195. }
  196. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  197. return dn.Ip == ip && dn.Port == port
  198. }
  199. func (dn *DataNode) Url() string {
  200. return util.JoinHostPort(dn.Ip, dn.Port)
  201. }
  202. func (dn *DataNode) ServerAddress() pb.ServerAddress {
  203. return pb.NewServerAddress(dn.Ip, dn.Port, dn.GrpcPort)
  204. }
  205. type DataNodeInfo struct {
  206. Url string `json:"Url"`
  207. PublicUrl string `json:"PublicUrl"`
  208. Volumes int64 `json:"Volumes"`
  209. EcShards int64 `json:"EcShards"`
  210. Max int64 `json:"Max"`
  211. VolumeIds string `json:"VolumeIds"`
  212. }
  213. func (dn *DataNode) ToInfo() (info DataNodeInfo) {
  214. info.Url = dn.Url()
  215. info.PublicUrl = dn.PublicUrl
  216. // aggregated volume info
  217. var volumeCount, ecShardCount, maxVolumeCount int64
  218. var volumeIds string
  219. for _, diskUsage := range dn.diskUsages.usages {
  220. volumeCount += diskUsage.volumeCount
  221. ecShardCount += diskUsage.ecShardCount
  222. maxVolumeCount += diskUsage.maxVolumeCount
  223. }
  224. for _, disk := range dn.Children() {
  225. d := disk.(*Disk)
  226. volumeIds += " " + d.GetVolumeIds()
  227. }
  228. info.Volumes = volumeCount
  229. info.EcShards = ecShardCount
  230. info.Max = maxVolumeCount
  231. info.VolumeIds = volumeIds
  232. return
  233. }
  234. func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
  235. m := &master_pb.DataNodeInfo{
  236. Id: string(dn.Id()),
  237. DiskInfos: make(map[string]*master_pb.DiskInfo),
  238. GrpcPort: uint32(dn.GrpcPort),
  239. }
  240. for _, c := range dn.Children() {
  241. disk := c.(*Disk)
  242. m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
  243. }
  244. return m
  245. }
  246. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  247. func (dn *DataNode) GetVolumeIds() string {
  248. dn.RLock()
  249. defer dn.RUnlock()
  250. existingVolumes := dn.getVolumes()
  251. ids := make([]int, 0, len(existingVolumes))
  252. for k := range existingVolumes {
  253. ids = append(ids, int(k))
  254. }
  255. return util.HumanReadableIntsMax(100, ids...)
  256. }
  257. func (dn *DataNode) getVolumes() []storage.VolumeInfo {
  258. var existingVolumes []storage.VolumeInfo
  259. for _, c := range dn.children {
  260. disk := c.(*Disk)
  261. existingVolumes = append(existingVolumes, disk.GetVolumes()...)
  262. }
  263. return existingVolumes
  264. }