topology_ec.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. )
  8. type EcShardLocations struct {
  9. Collection string
  10. Locations [erasure_coding.TotalShardsCount][]*DataNode
  11. }
  12. func (t *Topology) SyncDataNodeEcShards(shardInfos []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  13. // convert into in memory struct storage.VolumeInfo
  14. var shards []*erasure_coding.EcVolumeInfo
  15. for _, shardInfo := range shardInfos {
  16. shards = append(shards,
  17. erasure_coding.NewEcVolumeInfo(
  18. shardInfo.DiskType,
  19. shardInfo.Collection,
  20. needle.VolumeId(shardInfo.Id),
  21. erasure_coding.ShardBits(shardInfo.EcIndexBits)))
  22. }
  23. // find out the delta volumes
  24. newShards, deletedShards = dn.UpdateEcShards(shards)
  25. for _, v := range newShards {
  26. t.RegisterEcShards(v, dn)
  27. }
  28. for _, v := range deletedShards {
  29. t.UnRegisterEcShards(v, dn)
  30. }
  31. return
  32. }
  33. func (t *Topology) IncrementalSyncDataNodeEcShards(newEcShards, deletedEcShards []*master_pb.VolumeEcShardInformationMessage, dn *DataNode) {
  34. // convert into in memory struct storage.VolumeInfo
  35. var newShards, deletedShards []*erasure_coding.EcVolumeInfo
  36. for _, shardInfo := range newEcShards {
  37. newShards = append(newShards,
  38. erasure_coding.NewEcVolumeInfo(
  39. shardInfo.DiskType,
  40. shardInfo.Collection,
  41. needle.VolumeId(shardInfo.Id),
  42. erasure_coding.ShardBits(shardInfo.EcIndexBits)))
  43. }
  44. for _, shardInfo := range deletedEcShards {
  45. deletedShards = append(deletedShards,
  46. erasure_coding.NewEcVolumeInfo(
  47. shardInfo.DiskType,
  48. shardInfo.Collection,
  49. needle.VolumeId(shardInfo.Id),
  50. erasure_coding.ShardBits(shardInfo.EcIndexBits)))
  51. }
  52. dn.DeltaUpdateEcShards(newShards, deletedShards)
  53. for _, v := range newShards {
  54. t.RegisterEcShards(v, dn)
  55. }
  56. for _, v := range deletedShards {
  57. t.UnRegisterEcShards(v, dn)
  58. }
  59. return
  60. }
  61. func NewEcShardLocations(collection string) *EcShardLocations {
  62. return &EcShardLocations{
  63. Collection: collection,
  64. }
  65. }
  66. func (loc *EcShardLocations) AddShard(shardId erasure_coding.ShardId, dn *DataNode) (added bool) {
  67. dataNodes := loc.Locations[shardId]
  68. for _, n := range dataNodes {
  69. if n.Id() == dn.Id() {
  70. return false
  71. }
  72. }
  73. loc.Locations[shardId] = append(dataNodes, dn)
  74. return true
  75. }
  76. func (loc *EcShardLocations) DeleteShard(shardId erasure_coding.ShardId, dn *DataNode) (deleted bool) {
  77. dataNodes := loc.Locations[shardId]
  78. foundIndex := -1
  79. for index, n := range dataNodes {
  80. if n.Id() == dn.Id() {
  81. foundIndex = index
  82. }
  83. }
  84. if foundIndex < 0 {
  85. return false
  86. }
  87. loc.Locations[shardId] = append(dataNodes[:foundIndex], dataNodes[foundIndex+1:]...)
  88. return true
  89. }
  90. func (t *Topology) RegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  91. t.ecShardMapLock.Lock()
  92. defer t.ecShardMapLock.Unlock()
  93. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  94. if !found {
  95. locations = NewEcShardLocations(ecShardInfos.Collection)
  96. t.ecShardMap[ecShardInfos.VolumeId] = locations
  97. }
  98. for _, shardId := range ecShardInfos.ShardIds() {
  99. locations.AddShard(shardId, dn)
  100. }
  101. }
  102. func (t *Topology) UnRegisterEcShards(ecShardInfos *erasure_coding.EcVolumeInfo, dn *DataNode) {
  103. glog.Infof("removing ec shard info:%+v", ecShardInfos)
  104. t.ecShardMapLock.Lock()
  105. defer t.ecShardMapLock.Unlock()
  106. locations, found := t.ecShardMap[ecShardInfos.VolumeId]
  107. if !found {
  108. return
  109. }
  110. for _, shardId := range ecShardInfos.ShardIds() {
  111. locations.DeleteShard(shardId, dn)
  112. }
  113. }
  114. func (t *Topology) LookupEcShards(vid needle.VolumeId) (locations *EcShardLocations, found bool) {
  115. t.ecShardMapLock.RLock()
  116. defer t.ecShardMapLock.RUnlock()
  117. locations, found = t.ecShardMap[vid]
  118. return
  119. }
  120. func (t *Topology) ListEcServersByCollection(collection string) (dataNodes []string) {
  121. t.ecShardMapLock.RLock()
  122. defer t.ecShardMapLock.RUnlock()
  123. dateNodeMap := make(map[string]bool)
  124. for _, ecVolumeLocation := range t.ecShardMap {
  125. if ecVolumeLocation.Collection == collection {
  126. for _, locations := range ecVolumeLocation.Locations {
  127. for _, loc := range locations {
  128. dateNodeMap[string(loc.Id())] = true
  129. }
  130. }
  131. }
  132. }
  133. for k, _ := range dateNodeMap {
  134. dataNodes = append(dataNodes, k)
  135. }
  136. return
  137. }
  138. func (t *Topology) DeleteEcCollection(collection string) {
  139. t.ecShardMapLock.Lock()
  140. defer t.ecShardMapLock.Unlock()
  141. var vids []needle.VolumeId
  142. for vid, ecVolumeLocation := range t.ecShardMap {
  143. if ecVolumeLocation.Collection == collection {
  144. vids = append(vids, vid)
  145. }
  146. }
  147. for _, vid := range vids {
  148. delete(t.ecShardMap, vid)
  149. }
  150. return
  151. }