topology_ec.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/util/log"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. )
  8. type EcShardLocations struct {
  9. Collection string
  10. Locations [erasure_coding.TotalShardsCount][]*DataNode
  11. }
  12. func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  13. // convert into in memory struct storage.VolumeInfo
  14. var shards []*erasure_coding.EcVolumeInfo
  15. for _, shardInfo := range shardInfos {
  16. shards = append(shards,
  17. erasure_coding.NewEcVolumeInfo(
  18. shardInfo.Collection,
  19. needle.VolumeId(shardInfo.Id),
  20. erasure_coding.ShardBits(shardInfo.EcIndexBits)))
  21. }
  22. // find out the delta volumes
  23. newShards, deletedShards = dn.UpdateEcShards(shards)
  24. for _, v := range newShards {
  25. t.RegisterEcShards(v, dn)
  26. }
  27. for _, v := range deletedShards {
  28. t.UnRegisterEcShards(v, dn)
  29. }
  30. return
  31. }
  32. func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
  33. // convert into in memory struct storage.VolumeInfo
  34. var newShards, deletedShards []*erasure_coding.EcVolumeInfo
  35. for _, shardInfo := range newEcShards {
  36. newShards = append(newShards,
  37. erasure_coding.NewEcVolumeInfo(
  38. shardInfo.Collection,
  39. needle.VolumeId(shardInfo.Id),
  40. erasure_coding.ShardBits(shardInfo.EcIndexBits)))
  41. }
  42. for _, shardInfo := range deletedEcShards {
  43. deletedShards = append(deletedShards,
  44. erasure_coding.NewEcVolumeInfo(
  45. shardInfo.Collection,
  46. needle.VolumeId(shardInfo.Id),
  47. erasure_coding.ShardBits(shardInfo.EcIndexBits)))
  48. }
  49. dn.DeltaUpdateEcShards(newShards, deletedShards)
  50. for _, v := range newShards {
  51. t.RegisterEcShards(v, dn)
  52. }
  53. for _, v := range deletedShards {
  54. t.UnRegisterEcShards(v, dn)
  55. }
  56. return
  57. }
  58. func NewEcShardLocations(collection string) *EcShardLocations {
  59. return &EcShardLocations{
  60. Collection: collection,
  61. }
  62. }
  63. func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
  64. dataNodes := loc.Locations[shardId]
  65. for _, n := range dataNodes {
  66. if n.Id() == dn.Id() {
  67. return false
  68. }
  69. }
  70. loc.Locations[shardId] = append(dataNodes, dn)
  71. return true
  72. }
  73. func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
  74. dataNodes := loc.Locations[shardId]
  75. foundIndex := -1
  76. for index, n := range dataNodes {
  77. if n.Id() == dn.Id() {
  78. foundIndex = index
  79. }
  80. }
  81. if foundIndex < 0 {
  82. return false
  83. }
  84. loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
  85. return true
  86. }
  87. func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  88. t.ecShardMapLock.Lock()
  89. defer t.ecShardMapLock.Unlock()
  90. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  91. if !found {
  92. locations = NewEcShardLocations(ecShardInfos.Collection)
  93. t.ecShardMap[ecShardInfos.VolumeId] = locations
  94. }
  95. for _, shardId := range ecShardInfos.ShardIds() {
  96. locations.AddShard(shardId, dn)
  97. }
  98. }
  99. func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  100. log.Infof("removing ec shard info:%+v", ecShardInfos)
  101. t.ecShardMapLock.Lock()
  102. defer t.ecShardMapLock.Unlock()
  103. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  104. if !found {
  105. return
  106. }
  107. for _, shardId := range ecShardInfos.ShardIds() {
  108. locations.DeleteShard(shardId, dn)
  109. }
  110. }
  111. func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
  112. t.ecShardMapLock.RLock()
  113. defer t.ecShardMapLock.RUnlock()
  114. locations, found = t.ecShardMap[vid]
  115. return
  116. }
  117. func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
  118. t.ecShardMapLock.RLock()
  119. defer t.ecShardMapLock.RUnlock()
  120. dateNodeMap := make(map[string]bool)
  121. for _, ecVolumeLocation := range t.ecShardMap {
  122. if ecVolumeLocation.Collection == collection {
  123. for _, locations := range ecVolumeLocation.Locations {
  124. for _, loc := range locations {
  125. dateNodeMap[string(loc.Id())] = true
  126. }
  127. }
  128. }
  129. }
  130. for k, _ := range dateNodeMap {
  131. dataNodes = append(dataNodes, k)
  132. }
  133. return
  134. }
  135. func (t *Topology) DeleteEcCollection(collection string) {
  136. t.ecShardMapLock.Lock()
  137. defer t.ecShardMapLock.Unlock()
  138. var vids []needle.VolumeId
  139. for vid, ecVolumeLocation := range t.ecShardMap {
  140. if ecVolumeLocation.Collection == collection {
  141. vids = append(vids, vid)
  142. }
  143. }
  144. for _, vid := range vids {
  145. delete(t.ecShardMap, vid)
  146. }
  147. return
  148. }