data_node_ec.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package topology
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  4. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  5. )
  6. func (dn *DataNode) GetEcShards() (ret []*erasure_coding.EcVolumeInfo) {
  7. dn.RLock()
  8. for _, ecVolumeInfo := range dn.ecShards {
  9. ret = append(ret, ecVolumeInfo)
  10. }
  11. dn.RUnlock()
  12. return ret
  13. }
  14. func (dn *DataNode) UpdateEcShards(actualShards []*erasure_coding.EcVolumeInfo) (newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  15. // prepare the new ec shard map
  16. actualEcShardMap := make(map[needle.VolumeId]*erasure_coding.EcVolumeInfo)
  17. for _, ecShards := range actualShards {
  18. actualEcShardMap[ecShards.VolumeId] = ecShards
  19. }
  20. // found out the newShards and deletedShards
  21. var newShardCount, deletedShardCount int
  22. dn.ecShardsLock.RLock()
  23. for vid, ecShards := range dn.ecShards {
  24. if actualEcShards, ok := actualEcShardMap[vid]; !ok {
  25. // dn registered ec shards not found in the new set of ec shards
  26. deletedShards = append(deletedShards, ecShards)
  27. deletedShardCount += ecShards.ShardIdCount()
  28. } else {
  29. // found, but maybe the actual shard could be missing
  30. a := actualEcShards.Minus(ecShards)
  31. if a.ShardIdCount() > 0 {
  32. newShards = append(newShards, a)
  33. newShardCount += a.ShardIdCount()
  34. }
  35. d := ecShards.Minus(actualEcShards)
  36. if d.ShardIdCount() > 0 {
  37. deletedShards = append(deletedShards, d)
  38. deletedShardCount += d.ShardIdCount()
  39. }
  40. }
  41. }
  42. for _, ecShards := range actualShards {
  43. if _, found := dn.ecShards[ecShards.VolumeId]; !found {
  44. newShards = append(newShards, ecShards)
  45. newShardCount += ecShards.ShardIdCount()
  46. }
  47. }
  48. dn.ecShardsLock.RUnlock()
  49. if len(newShards) > 0 || len(deletedShards) > 0 {
  50. // if changed, set to the new ec shard map
  51. dn.ecShardsLock.Lock()
  52. dn.ecShards = actualEcShardMap
  53. dn.UpAdjustEcShardCountDelta(int64(newShardCount - deletedShardCount))
  54. dn.ecShardsLock.Unlock()
  55. }
  56. return
  57. }
  58. func (dn *DataNode) DeltaUpdateEcShards(newShards, deletedShards []*erasure_coding.EcVolumeInfo) {
  59. for _, newShard := range newShards {
  60. dn.AddOrUpdateEcShard(newShard)
  61. }
  62. for _, deletedShard := range deletedShards {
  63. dn.DeleteEcShard(deletedShard)
  64. }
  65. }
  66. func (dn *DataNode) AddOrUpdateEcShard(s *erasure_coding.EcVolumeInfo) {
  67. dn.ecShardsLock.Lock()
  68. defer dn.ecShardsLock.Unlock()
  69. delta := 0
  70. if existing, ok := dn.ecShards[s.VolumeId]; !ok {
  71. dn.ecShards[s.VolumeId] = s
  72. delta = s.ShardBits.ShardIdCount()
  73. } else {
  74. oldCount := existing.ShardBits.ShardIdCount()
  75. existing.ShardBits = existing.ShardBits.Plus(s.ShardBits)
  76. delta = existing.ShardBits.ShardIdCount() - oldCount
  77. }
  78. dn.UpAdjustEcShardCountDelta(int64(delta))
  79. }
  80. func (dn *DataNode) DeleteEcShard(s *erasure_coding.EcVolumeInfo) {
  81. dn.ecShardsLock.Lock()
  82. defer dn.ecShardsLock.Unlock()
  83. if existing, ok := dn.ecShards[s.VolumeId]; ok {
  84. oldCount := existing.ShardBits.ShardIdCount()
  85. existing.ShardBits = existing.ShardBits.Minus(s.ShardBits)
  86. delta := existing.ShardBits.ShardIdCount() - oldCount
  87. dn.UpAdjustEcShardCountDelta(int64(delta))
  88. if existing.ShardBits.ShardIdCount() == 0 {
  89. delete(dn.ecShards, s.VolumeId)
  90. }
  91. }
  92. }
  93. func (dn *DataNode) HasVolumesById(id needle.VolumeId) (hasVolumeId bool) {
  94. // check whether normal volumes has this volume id
  95. dn.RLock()
  96. _, ok := dn.volumes[id]
  97. if ok {
  98. hasVolumeId = true
  99. }
  100. dn.RUnlock()
  101. if hasVolumeId {
  102. return
  103. }
  104. // check whether ec shards has this volume id
  105. dn.ecShardsLock.RLock()
  106. _, ok = dn.ecShards[id]
  107. if ok {
  108. hasVolumeId = true
  109. }
  110. dn.ecShardsLock.RUnlock()
  111. return
  112. }