topology_ec.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package topology
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. )
  9. type EcShardLocations struct {
  10. Collection string
  11. Locations [erasure_coding.TotalShardsCount][]*DataNode
  12. }
  13. func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  14. // convert into in memory struct storage.VolumeInfo
  15. var shards []*erasure_coding.EcVolumeInfo
  16. for _, shardInfo := range shardInfos {
  17. shards = append(shards,
  18. erasure_coding.NewEcVolumeInfo(
  19. shardInfo.DiskType,
  20. shardInfo.Collection,
  21. needle.VolumeId(shardInfo.Id),
  22. erasure_coding.ShardBits(shardInfo.EcIndexBits),
  23. shardInfo.ExpireAtSec))
  24. }
  25. // find out the delta volumes
  26. newShards, deletedShards = dn.UpdateEcShards(shards)
  27. for _, v := range newShards {
  28. t.RegisterEcShards(v, dn)
  29. }
  30. for _, v := range deletedShards {
  31. t.UnRegisterEcShards(v, dn)
  32. }
  33. return
  34. }
  35. func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
  36. // convert into in memory struct storage.VolumeInfo
  37. var newShards, deletedShards []*erasure_coding.EcVolumeInfo
  38. for _, shardInfo := range newEcShards {
  39. newShards = append(newShards,
  40. erasure_coding.NewEcVolumeInfo(
  41. shardInfo.DiskType,
  42. shardInfo.Collection,
  43. needle.VolumeId(shardInfo.Id),
  44. erasure_coding.ShardBits(shardInfo.EcIndexBits), shardInfo.ExpireAtSec))
  45. }
  46. for _, shardInfo := range deletedEcShards {
  47. deletedShards = append(deletedShards,
  48. erasure_coding.NewEcVolumeInfo(
  49. shardInfo.DiskType,
  50. shardInfo.Collection,
  51. needle.VolumeId(shardInfo.Id),
  52. erasure_coding.ShardBits(shardInfo.EcIndexBits), shardInfo.ExpireAtSec))
  53. }
  54. dn.DeltaUpdateEcShards(newShards, deletedShards)
  55. for _, v := range newShards {
  56. t.RegisterEcShards(v, dn)
  57. }
  58. for _, v := range deletedShards {
  59. t.UnRegisterEcShards(v, dn)
  60. }
  61. return
  62. }
  63. func NewEcShardLocations(collection string) *EcShardLocations {
  64. return &EcShardLocations{
  65. Collection: collection,
  66. }
  67. }
  68. func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
  69. dataNodes := loc.Locations[shardId]
  70. for _, n := range dataNodes {
  71. if n.Id() == dn.Id() {
  72. return false
  73. }
  74. }
  75. loc.Locations[shardId] = append(dataNodes, dn)
  76. return true
  77. }
  78. func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
  79. dataNodes := loc.Locations[shardId]
  80. foundIndex := -1
  81. for index, n := range dataNodes {
  82. if n.Id() == dn.Id() {
  83. foundIndex = index
  84. }
  85. }
  86. if foundIndex < 0 {
  87. return false
  88. }
  89. loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
  90. return true
  91. }
  92. func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  93. t.ecShardMapLock.Lock()
  94. defer t.ecShardMapLock.Unlock()
  95. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  96. if !found {
  97. locations = NewEcShardLocations(ecShardInfos.Collection)
  98. t.ecShardMap[ecShardInfos.VolumeId] = locations
  99. }
  100. for _, shardId := range ecShardInfos.ShardIds() {
  101. locations.AddShard(shardId, dn)
  102. }
  103. }
  104. func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  105. glog.Infof("removing ec shard info:%+v", ecShardInfos)
  106. t.ecShardMapLock.Lock()
  107. defer t.ecShardMapLock.Unlock()
  108. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  109. if !found {
  110. return
  111. }
  112. for _, shardId := range ecShardInfos.ShardIds() {
  113. locations.DeleteShard(shardId, dn)
  114. }
  115. }
  116. func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
  117. t.ecShardMapLock.RLock()
  118. defer t.ecShardMapLock.RUnlock()
  119. locations, found = t.ecShardMap[vid]
  120. return
  121. }
  122. func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []pb.ServerAddress) {
  123. t.ecShardMapLock.RLock()
  124. defer t.ecShardMapLock.RUnlock()
  125. dateNodeMap := make(map[pb.ServerAddress]bool)
  126. for _, ecVolumeLocation := range t.ecShardMap {
  127. if ecVolumeLocation.Collection == collection {
  128. for _, locations := range ecVolumeLocation.Locations {
  129. for _, loc := range locations {
  130. dateNodeMap[loc.ServerAddress()] = true
  131. }
  132. }
  133. }
  134. }
  135. for k, _ := range dateNodeMap {
  136. dataNodes = append(dataNodes, k)
  137. }
  138. return
  139. }
  140. func (t *Topology) DeleteEcCollection(collection string) {
  141. t.ecShardMapLock.Lock()
  142. defer t.ecShardMapLock.Unlock()
  143. var vids []needle.VolumeId
  144. for vid, ecVolumeLocation := range t.ecShardMap {
  145. if ecVolumeLocation.Collection == collection {
  146. vids = append(vids, vid)
  147. }
  148. }
  149. for _, vid := range vids {
  150. delete(t.ecShardMap, vid)
  151. }
  152. return
  153. }