command_ec_balance.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  10. "golang.org/x/exp/slices"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandEcBalance{})
  14. }
  15. type commandEcBalance struct {
  16. }
  17. func (c *commandEcBalance) Name() string {
  18. return "ec.balance"
  19. }
  20. func (c *commandEcBalance) Help() string {
  21. return `balance all ec shards among all racks and volume servers
  22. ec.balance [-c EACH_COLLECTION|<collection_name>] [-force] [-dataCenter <data_center>]
  23. Algorithm:
  24. func EcBalance() {
  25. for each collection:
  26. balanceEcVolumes(collectionName)
  27. for each rack:
  28. balanceEcRack(rack)
  29. }
  30. func balanceEcVolumes(collectionName){
  31. for each volume:
  32. doDeduplicateEcShards(volumeId)
  33. tracks rack~shardCount mapping
  34. for each volume:
  35. doBalanceEcShardsAcrossRacks(volumeId)
  36. for each volume:
  37. doBalanceEcShardsWithinRacks(volumeId)
  38. }
  39. // spread ec shards into more racks
  40. func doBalanceEcShardsAcrossRacks(volumeId){
  41. tracks rack~volumeIdShardCount mapping
  42. averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
  43. ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  44. for each ecShardsToMove {
  45. destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, averageShardsPerEcRack)
  46. destVolumeServers = volume servers on the destRack
  47. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  48. }
  49. }
  50. func doBalanceEcShardsWithinRacks(volumeId){
  51. racks = collect all racks that the volume id is on
  52. for rack, shards := range racks
  53. doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
  54. }
  55. // move ec shards
  56. func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
  57. tracks volumeServer~volumeIdShardCount mapping
  58. averageShardCount = len(shards) / numVolumeServers
  59. volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
  60. ecShardsToMove = select overflown ec shards from volumeServersOverAverage
  61. for each ecShardsToMove {
  62. destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, averageShardCount)
  63. pickOneEcNodeAndMoveOneShard(destVolumeServers)
  64. }
  65. }
  66. // move ec shards while keeping shard distribution for the same volume unchanged or more even
  67. func balanceEcRack(rack){
  68. averageShardCount = total shards / numVolumeServers
  69. for hasMovedOneEcShard {
  70. sort all volume servers ordered by the number of local ec shards
  71. pick the volume server A with the lowest number of ec shards x
  72. pick the volume server B with the highest number of ec shards y
  73. if y > averageShardCount and x +1 <= averageShardCount {
  74. if B has a ec shard with volume id v that A does not have {
  75. move one ec shard v from B to A
  76. hasMovedOneEcShard = true
  77. }
  78. }
  79. }
  80. }
  81. `
  82. }
  83. func (c *commandEcBalance) HasTag(CommandTag) bool {
  84. return false
  85. }
  86. func (c *commandEcBalance) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  87. balanceCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  88. collection := balanceCommand.String("collection", "EACH_COLLECTION", "collection name, or \"EACH_COLLECTION\" for each collection")
  89. dc := balanceCommand.String("dataCenter", "", "only apply the balancing for this dataCenter")
  90. applyBalancing := balanceCommand.Bool("force", false, "apply the balancing plan")
  91. if err = balanceCommand.Parse(args); err != nil {
  92. return nil
  93. }
  94. infoAboutSimulationMode(writer, *applyBalancing, "-force")
  95. if err = commandEnv.confirmIsLocked(args); err != nil {
  96. return
  97. }
  98. // collect all ec nodes
  99. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, *dc)
  100. if err != nil {
  101. return err
  102. }
  103. if totalFreeEcSlots < 1 {
  104. return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
  105. }
  106. racks := collectRacks(allEcNodes)
  107. if *collection == "EACH_COLLECTION" {
  108. collections, err := ListCollectionNames(commandEnv, false, true)
  109. if err != nil {
  110. return err
  111. }
  112. fmt.Printf("balanceEcVolumes collections %+v\n", len(collections))
  113. for _, c := range collections {
  114. fmt.Printf("balanceEcVolumes collection %+v\n", c)
  115. if err = balanceEcVolumes(commandEnv, c, allEcNodes, racks, *applyBalancing); err != nil {
  116. return err
  117. }
  118. }
  119. } else {
  120. if err = balanceEcVolumes(commandEnv, *collection, allEcNodes, racks, *applyBalancing); err != nil {
  121. return err
  122. }
  123. }
  124. if err := balanceEcRacks(commandEnv, racks, *applyBalancing); err != nil {
  125. return fmt.Errorf("balance ec racks: %v", err)
  126. }
  127. return nil
  128. }
  129. func collectRacks(allEcNodes []*EcNode) map[RackId]*EcRack {
  130. // collect racks info
  131. racks := make(map[RackId]*EcRack)
  132. for _, ecNode := range allEcNodes {
  133. if racks[ecNode.rack] == nil {
  134. racks[ecNode.rack] = &EcRack{
  135. ecNodes: make(map[EcNodeId]*EcNode),
  136. }
  137. }
  138. racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
  139. racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
  140. }
  141. return racks
  142. }
  143. func balanceEcVolumes(commandEnv *CommandEnv, collection string, allEcNodes []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  144. fmt.Printf("balanceEcVolumes %s\n", collection)
  145. if err := deleteDuplicatedEcShards(commandEnv, allEcNodes, collection, applyBalancing); err != nil {
  146. return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
  147. }
  148. if err := balanceEcShardsAcrossRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  149. return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
  150. }
  151. if err := balanceEcShardsWithinRacks(commandEnv, allEcNodes, racks, collection, applyBalancing); err != nil {
  152. return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
  153. }
  154. return nil
  155. }
  156. func deleteDuplicatedEcShards(commandEnv *CommandEnv, allEcNodes []*EcNode, collection string, applyBalancing bool) error {
  157. // vid => []ecNode
  158. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  159. // deduplicate ec shards
  160. for vid, locations := range vidLocations {
  161. if err := doDeduplicateEcShards(commandEnv, collection, vid, locations, applyBalancing); err != nil {
  162. return err
  163. }
  164. }
  165. return nil
  166. }
  167. func doDeduplicateEcShards(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, applyBalancing bool) error {
  168. // check whether this volume has ecNodes that are over average
  169. shardToLocations := make([][]*EcNode, erasure_coding.TotalShardsCount)
  170. for _, ecNode := range locations {
  171. shardBits := findEcVolumeShards(ecNode, vid)
  172. for _, shardId := range shardBits.ShardIds() {
  173. shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
  174. }
  175. }
  176. for shardId, ecNodes := range shardToLocations {
  177. if len(ecNodes) <= 1 {
  178. continue
  179. }
  180. sortEcNodesByFreeslotsAscending(ecNodes)
  181. fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
  182. if !applyBalancing {
  183. continue
  184. }
  185. duplicatedShardIds := []uint32{uint32(shardId)}
  186. for _, ecNode := range ecNodes[1:] {
  187. if err := unmountEcShards(commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  188. return err
  189. }
  190. if err := sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
  191. return err
  192. }
  193. ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
  194. }
  195. }
  196. return nil
  197. }
  198. func balanceEcShardsAcrossRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  199. // collect vid => []ecNode, since previous steps can change the locations
  200. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  201. // spread the ec shards evenly
  202. for vid, locations := range vidLocations {
  203. if err := doBalanceEcShardsAcrossRacks(commandEnv, collection, vid, locations, racks, applyBalancing); err != nil {
  204. return err
  205. }
  206. }
  207. return nil
  208. }
  209. func doBalanceEcShardsAcrossRacks(commandEnv *CommandEnv, collection string, vid needle.VolumeId, locations []*EcNode, racks map[RackId]*EcRack, applyBalancing bool) error {
  210. // calculate average number of shards an ec rack should have for one volume
  211. averageShardsPerEcRack := ceilDivide(erasure_coding.TotalShardsCount, len(racks))
  212. // see the volume's shards are in how many racks, and how many in each rack
  213. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  214. shardBits := findEcVolumeShards(ecNode, vid)
  215. return string(ecNode.rack), shardBits.ShardIdCount()
  216. })
  217. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  218. return string(ecNode.rack)
  219. })
  220. // ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
  221. ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
  222. for rackId, count := range rackToShardCount {
  223. if count > averageShardsPerEcRack {
  224. possibleEcNodes := rackEcNodesWithVid[rackId]
  225. for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
  226. ecShardsToMove[shardId] = ecNode
  227. }
  228. }
  229. }
  230. for shardId, ecNode := range ecShardsToMove {
  231. rackId := pickOneRack(racks, rackToShardCount, averageShardsPerEcRack)
  232. if rackId == "" {
  233. fmt.Printf("ec shard %d.%d at %s can not find a destination rack\n", vid, shardId, ecNode.info.Id)
  234. continue
  235. }
  236. var possibleDestinationEcNodes []*EcNode
  237. for _, n := range racks[rackId].ecNodes {
  238. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  239. }
  240. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcRack, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  241. if err != nil {
  242. return err
  243. }
  244. rackToShardCount[string(rackId)] += 1
  245. rackToShardCount[string(ecNode.rack)] -= 1
  246. racks[rackId].freeEcSlot -= 1
  247. racks[ecNode.rack].freeEcSlot += 1
  248. }
  249. return nil
  250. }
  251. func pickOneRack(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int, averageShardsPerEcRack int) RackId {
  252. // TODO later may need to add some randomness
  253. for rackId, rack := range rackToEcNodes {
  254. if rackToShardCount[string(rackId)] >= averageShardsPerEcRack {
  255. continue
  256. }
  257. if rack.freeEcSlot <= 0 {
  258. continue
  259. }
  260. return rackId
  261. }
  262. return ""
  263. }
  264. func balanceEcShardsWithinRacks(commandEnv *CommandEnv, allEcNodes []*EcNode, racks map[RackId]*EcRack, collection string, applyBalancing bool) error {
  265. // collect vid => []ecNode, since previous steps can change the locations
  266. vidLocations := collectVolumeIdToEcNodes(allEcNodes, collection)
  267. // spread the ec shards evenly
  268. for vid, locations := range vidLocations {
  269. // see the volume's shards are in how many racks, and how many in each rack
  270. rackToShardCount := groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
  271. shardBits := findEcVolumeShards(ecNode, vid)
  272. return string(ecNode.rack), shardBits.ShardIdCount()
  273. })
  274. rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
  275. return string(ecNode.rack)
  276. })
  277. for rackId, _ := range rackToShardCount {
  278. var possibleDestinationEcNodes []*EcNode
  279. for _, n := range racks[RackId(rackId)].ecNodes {
  280. if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
  281. possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
  282. }
  283. }
  284. sourceEcNodes := rackEcNodesWithVid[rackId]
  285. averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
  286. if err := doBalanceEcShardsWithinOneRack(commandEnv, averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes, applyBalancing); err != nil {
  287. return err
  288. }
  289. }
  290. }
  291. return nil
  292. }
  293. func doBalanceEcShardsWithinOneRack(commandEnv *CommandEnv, averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  294. for _, ecNode := range existingLocations {
  295. shardBits := findEcVolumeShards(ecNode, vid)
  296. overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
  297. for _, shardId := range shardBits.ShardIds() {
  298. if overLimitCount <= 0 {
  299. break
  300. }
  301. fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
  302. err := pickOneEcNodeAndMoveOneShard(commandEnv, averageShardsPerEcNode, ecNode, collection, vid, shardId, possibleDestinationEcNodes, applyBalancing)
  303. if err != nil {
  304. return err
  305. }
  306. overLimitCount--
  307. }
  308. }
  309. return nil
  310. }
  311. func balanceEcRacks(commandEnv *CommandEnv, racks map[RackId]*EcRack, applyBalancing bool) error {
  312. // balance one rack for all ec shards
  313. for _, ecRack := range racks {
  314. if err := doBalanceEcRack(commandEnv, ecRack, applyBalancing); err != nil {
  315. return err
  316. }
  317. }
  318. return nil
  319. }
  320. func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool) error {
  321. if len(ecRack.ecNodes) <= 1 {
  322. return nil
  323. }
  324. var rackEcNodes []*EcNode
  325. for _, node := range ecRack.ecNodes {
  326. rackEcNodes = append(rackEcNodes, node)
  327. }
  328. ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
  329. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  330. if !found {
  331. return
  332. }
  333. for _, ecShardInfo := range diskInfo.EcShardInfos {
  334. count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
  335. }
  336. return ecNode.info.Id, count
  337. })
  338. var totalShardCount int
  339. for _, count := range ecNodeIdToShardCount {
  340. totalShardCount += count
  341. }
  342. averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
  343. hasMove := true
  344. for hasMove {
  345. hasMove = false
  346. slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
  347. return b.freeEcSlot - a.freeEcSlot
  348. })
  349. emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
  350. emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
  351. if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
  352. emptyNodeIds := make(map[uint32]bool)
  353. if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
  354. for _, shards := range emptyDiskInfo.EcShardInfos {
  355. emptyNodeIds[shards.Id] = true
  356. }
  357. }
  358. if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
  359. for _, shards := range fullDiskInfo.EcShardInfos {
  360. if _, found := emptyNodeIds[shards.Id]; !found {
  361. for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
  362. fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
  363. err := moveMountedShardToEcNode(commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, applyBalancing)
  364. if err != nil {
  365. return err
  366. }
  367. ecNodeIdToShardCount[emptyNode.info.Id]++
  368. ecNodeIdToShardCount[fullNode.info.Id]--
  369. hasMove = true
  370. break
  371. }
  372. break
  373. }
  374. }
  375. }
  376. }
  377. }
  378. return nil
  379. }
  380. func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
  381. sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes)
  382. skipReason := ""
  383. for _, destEcNode := range possibleDestinationEcNodes {
  384. if destEcNode.info.Id == existingLocation.info.Id {
  385. continue
  386. }
  387. if destEcNode.freeEcSlot <= 0 {
  388. skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id)
  389. continue
  390. }
  391. if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
  392. skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n",
  393. destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode)
  394. continue
  395. }
  396. fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destEcNode.info.Id)
  397. err := moveMountedShardToEcNode(commandEnv, existingLocation, collection, vid, shardId, destEcNode, applyBalancing)
  398. if err != nil {
  399. return err
  400. }
  401. return nil
  402. }
  403. fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason)
  404. return nil
  405. }
  406. func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
  407. picked := make(map[erasure_coding.ShardId]*EcNode)
  408. var candidateEcNodes []*CandidateEcNode
  409. for _, ecNode := range ecNodes {
  410. shardBits := findEcVolumeShards(ecNode, vid)
  411. if shardBits.ShardIdCount() > 0 {
  412. candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
  413. ecNode: ecNode,
  414. shardCount: shardBits.ShardIdCount(),
  415. })
  416. }
  417. }
  418. slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
  419. return b.shardCount - a.shardCount
  420. })
  421. for i := 0; i < n; i++ {
  422. selectedEcNodeIndex := -1
  423. for i, candidateEcNode := range candidateEcNodes {
  424. shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
  425. if shardBits > 0 {
  426. selectedEcNodeIndex = i
  427. for _, shardId := range shardBits.ShardIds() {
  428. candidateEcNode.shardCount--
  429. picked[shardId] = candidateEcNode.ecNode
  430. candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
  431. break
  432. }
  433. break
  434. }
  435. }
  436. if selectedEcNodeIndex >= 0 {
  437. ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
  438. return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
  439. })
  440. }
  441. }
  442. return picked
  443. }
  444. func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needle.VolumeId][]*EcNode {
  445. vidLocations := make(map[needle.VolumeId][]*EcNode)
  446. for _, ecNode := range allEcNodes {
  447. diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
  448. if !found {
  449. continue
  450. }
  451. for _, shardInfo := range diskInfo.EcShardInfos {
  452. // ignore if not in current collection
  453. if shardInfo.Collection == collection {
  454. vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
  455. }
  456. }
  457. }
  458. return vidLocations
  459. }