command_volume_fix_replication.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  7. "io"
  8. "sort"
  9. "github.com/chrislusf/seaweedfs/weed/operation"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  12. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandVolumeFixReplication{})
  16. }
  17. type commandVolumeFixReplication struct {
  18. }
  19. func (c *commandVolumeFixReplication) Name() string {
  20. return "volume.fix.replication"
  21. }
  22. func (c *commandVolumeFixReplication) Help() string {
  23. return `add replicas to volumes that are missing replicas
  24. This command finds all over-replicated volumes. If found, it will purge the oldest copies and stop.
  25. This command also finds all under-replicated volumes, and finds volume servers with free slots.
  26. If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
  27. volume.fix.replication -n # do not take action
  28. volume.fix.replication # actually deleting or copying the volume files and mount the volume
  29. Note:
  30. * each time this will only add back one replica for one volume id. If there are multiple replicas
  31. are missing, e.g. multiple volume servers are new, you may need to run this multiple times.
  32. * do not run this too quickly within seconds, since the new volume replica may take a few seconds
  33. to register itself to the master.
  34. `
  35. }
  36. func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  37. if err = commandEnv.confirmIsLocked(); err != nil {
  38. return
  39. }
  40. volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  41. skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
  42. if err = volFixReplicationCommand.Parse(args); err != nil {
  43. return nil
  44. }
  45. takeAction := !*skipChange
  46. var resp *master_pb.VolumeListResponse
  47. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  48. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  49. return err
  50. })
  51. if err != nil {
  52. return err
  53. }
  54. // find all volumes that needs replication
  55. // collect all data nodes
  56. volumeReplicas, allLocations := collectVolumeReplicaLocations(resp)
  57. if len(allLocations) == 0 {
  58. return fmt.Errorf("no data nodes at all")
  59. }
  60. // find all under replicated volumes
  61. var underReplicatedVolumeIds, overReplicatedVolumeIds []uint32
  62. for vid, replicas := range volumeReplicas {
  63. replica := replicas[0]
  64. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  65. if replicaPlacement.GetCopyCount() > len(replicas) {
  66. underReplicatedVolumeIds = append(underReplicatedVolumeIds, vid)
  67. } else if replicaPlacement.GetCopyCount() < len(replicas) {
  68. overReplicatedVolumeIds = append(overReplicatedVolumeIds, vid)
  69. fmt.Fprintf(writer, "volume %d replication %s, but over replicated %+d\n", replica.info.Id, replicaPlacement, len(replicas))
  70. }
  71. }
  72. if len(overReplicatedVolumeIds) > 0 {
  73. return c.fixOverReplicatedVolumes(commandEnv, writer, takeAction, overReplicatedVolumeIds, volumeReplicas, allLocations)
  74. }
  75. if len(underReplicatedVolumeIds) == 0 {
  76. return nil
  77. }
  78. // find the most under populated data nodes
  79. keepDataNodesSorted(allLocations)
  80. return c.fixUnderReplicatedVolumes(commandEnv, writer, takeAction, underReplicatedVolumeIds, volumeReplicas, allLocations)
  81. }
  82. func collectVolumeReplicaLocations(resp *master_pb.VolumeListResponse) (map[uint32][]*VolumeReplica, []location) {
  83. volumeReplicas := make(map[uint32][]*VolumeReplica)
  84. var allLocations []location
  85. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  86. loc := newLocation(dc, string(rack), dn)
  87. for _, v := range dn.VolumeInfos {
  88. volumeReplicas[v.Id] = append(volumeReplicas[v.Id], &VolumeReplica{
  89. location: &loc,
  90. info: v,
  91. })
  92. }
  93. allLocations = append(allLocations, loc)
  94. })
  95. return volumeReplicas, allLocations
  96. }
  97. func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, overReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  98. for _, vid := range overReplicatedVolumeIds {
  99. replicas := volumeReplicas[vid]
  100. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replicas[0].info.ReplicaPlacement))
  101. replica := pickOneReplicaToDelete(replicas, replicaPlacement)
  102. fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
  103. if !takeAction {
  104. break
  105. }
  106. if err := deleteVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(replica.info.Id), replica.location.dataNode.Id); err != nil {
  107. return fmt.Errorf("deleting volume %d from %s : %v", replica.info.Id, replica.location.dataNode.Id, err)
  108. }
  109. }
  110. return nil
  111. }
  112. func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *CommandEnv, writer io.Writer, takeAction bool, underReplicatedVolumeIds []uint32, volumeReplicas map[uint32][]*VolumeReplica, allLocations []location) error {
  113. for _, vid := range underReplicatedVolumeIds {
  114. replicas := volumeReplicas[vid]
  115. replica := pickOneReplicaToCopyFrom(replicas)
  116. replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(replica.info.ReplicaPlacement))
  117. foundNewLocation := false
  118. for _, dst := range allLocations {
  119. // check whether data nodes satisfy the constraints
  120. if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
  121. // ask the volume server to replicate the volume
  122. foundNewLocation = true
  123. fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)
  124. if !takeAction {
  125. break
  126. }
  127. err := operation.WithVolumeServerClient(dst.dataNode.Id, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  128. _, replicateErr := volumeServerClient.VolumeCopy(context.Background(), &volume_server_pb.VolumeCopyRequest{
  129. VolumeId: replica.info.Id,
  130. SourceDataNode: replica.location.dataNode.Id,
  131. })
  132. if replicateErr != nil {
  133. return fmt.Errorf("copying from %s => %s : %v", replica.location.dataNode.Id, dst.dataNode.Id, replicateErr)
  134. }
  135. return nil
  136. })
  137. if err != nil {
  138. return err
  139. }
  140. // adjust free volume count
  141. dst.dataNode.FreeVolumeCount--
  142. keepDataNodesSorted(allLocations)
  143. break
  144. }
  145. }
  146. if !foundNewLocation {
  147. fmt.Fprintf(writer, "failed to place volume %d replica as %s, existing:%+v\n", replica.info.Id, replicaPlacement, len(replicas))
  148. }
  149. }
  150. return nil
  151. }
  152. func keepDataNodesSorted(dataNodes []location) {
  153. sort.Slice(dataNodes, func(i, j int) bool {
  154. return dataNodes[i].dataNode.FreeVolumeCount > dataNodes[j].dataNode.FreeVolumeCount
  155. })
  156. }
  157. /*
  158. if on an existing data node {
  159. return false
  160. }
  161. if different from existing dcs {
  162. if lack on different dcs {
  163. return true
  164. }else{
  165. return false
  166. }
  167. }
  168. if not on primary dc {
  169. return false
  170. }
  171. if different from existing racks {
  172. if lack on different racks {
  173. return true
  174. }else{
  175. return false
  176. }
  177. }
  178. if not on primary rack {
  179. return false
  180. }
  181. if lacks on same rack {
  182. return true
  183. } else {
  184. return false
  185. }
  186. */
  187. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool {
  188. existingDataCenters, _, existingDataNodes := countReplicas(replicas)
  189. if _, found := existingDataNodes[possibleLocation.String()]; found {
  190. // avoid duplicated volume on the same data node
  191. return false
  192. }
  193. primaryDataCenters, _ := findTopKeys(existingDataCenters)
  194. // ensure data center count is within limit
  195. if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found {
  196. // different from existing dcs
  197. if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 {
  198. // lack on different dcs
  199. return true
  200. } else {
  201. // adding this would go over the different dcs limit
  202. return false
  203. }
  204. }
  205. // now this is same as one of the existing data center
  206. if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) {
  207. // not on one of the primary dcs
  208. return false
  209. }
  210. // now this is one of the primary dcs
  211. primaryDcRacks := make(map[string]int)
  212. for _, replica := range replicas {
  213. if replica.location.DataCenter() != possibleLocation.DataCenter() {
  214. continue
  215. }
  216. primaryDcRacks[replica.location.Rack()] += 1
  217. }
  218. primaryRacks, _ := findTopKeys(primaryDcRacks)
  219. sameRackCount := primaryDcRacks[possibleLocation.Rack()]
  220. // ensure rack count is within limit
  221. if _, found := primaryDcRacks[possibleLocation.Rack()]; !found {
  222. // different from existing racks
  223. if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 {
  224. // lack on different racks
  225. return true
  226. } else {
  227. // adding this would go over the different racks limit
  228. return false
  229. }
  230. }
  231. // now this is same as one of the existing racks
  232. if !isAmong(possibleLocation.Rack(), primaryRacks) {
  233. // not on the primary rack
  234. return false
  235. }
  236. // now this is on the primary rack
  237. // different from existing data nodes
  238. if sameRackCount < replicaPlacement.SameRackCount+1 {
  239. // lack on same rack
  240. return true
  241. } else {
  242. // adding this would go over the same data node limit
  243. return false
  244. }
  245. }
  246. func findTopKeys(m map[string]int) (topKeys []string, max int) {
  247. for k, c := range m {
  248. if max < c {
  249. topKeys = topKeys[:0]
  250. topKeys = append(topKeys, k)
  251. max = c
  252. } else if max == c {
  253. topKeys = append(topKeys, k)
  254. }
  255. }
  256. return
  257. }
  258. func isAmong(key string, keys []string) bool {
  259. for _, k := range keys {
  260. if k == key {
  261. return true
  262. }
  263. }
  264. return false
  265. }
  266. type VolumeReplica struct {
  267. location *location
  268. info *master_pb.VolumeInformationMessage
  269. }
  270. type location struct {
  271. dc string
  272. rack string
  273. dataNode *master_pb.DataNodeInfo
  274. }
  275. func newLocation(dc, rack string, dataNode *master_pb.DataNodeInfo) location {
  276. return location{
  277. dc: dc,
  278. rack: rack,
  279. dataNode: dataNode,
  280. }
  281. }
  282. func (l location) String() string {
  283. return fmt.Sprintf("%s %s %s", l.dc, l.rack, l.dataNode.Id)
  284. }
  285. func (l location) Rack() string {
  286. return fmt.Sprintf("%s %s", l.dc, l.rack)
  287. }
  288. func (l location) DataCenter() string {
  289. return l.dc
  290. }
  291. func pickOneReplicaToCopyFrom(replicas []*VolumeReplica) *VolumeReplica {
  292. mostRecent := replicas[0]
  293. for _, replica := range replicas {
  294. if replica.info.ModifiedAtSecond > mostRecent.info.ModifiedAtSecond {
  295. mostRecent = replica
  296. }
  297. }
  298. return mostRecent
  299. }
  300. func countReplicas(replicas []*VolumeReplica) (diffDc, diffRack, diffNode map[string]int) {
  301. diffDc = make(map[string]int)
  302. diffRack = make(map[string]int)
  303. diffNode = make(map[string]int)
  304. for _, replica := range replicas {
  305. diffDc[replica.location.DataCenter()] += 1
  306. diffRack[replica.location.Rack()] += 1
  307. diffNode[replica.location.String()] += 1
  308. }
  309. return
  310. }
  311. func pickOneReplicaToDelete(replicas []*VolumeReplica, replicaPlacement *super_block.ReplicaPlacement) *VolumeReplica {
  312. sort.Slice(replicas, func(i, j int) bool {
  313. a, b := replicas[i], replicas[j]
  314. if a.info.CompactRevision != b.info.CompactRevision {
  315. return a.info.CompactRevision < b.info.CompactRevision
  316. }
  317. if a.info.ModifiedAtSecond != b.info.ModifiedAtSecond {
  318. return a.info.ModifiedAtSecond < b.info.ModifiedAtSecond
  319. }
  320. if a.info.Size != b.info.Size {
  321. return a.info.Size < b.info.Size
  322. }
  323. return false
  324. })
  325. return replicas[0]
  326. }