command_ec_encode.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "math/rand"
  8. "sync"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/pb"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  18. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  19. )
  20. func init() {
  21. Commands = append(Commands, &commandEcEncode{})
  22. }
  23. type commandEcEncode struct {
  24. }
  25. func (c *commandEcEncode) Name() string {
  26. return "ec.encode"
  27. }
  28. func (c *commandEcEncode) Help() string {
  29. return `apply erasure coding to a volume
  30. ec.encode [-collection=""] [-fullPercent=95 -quietFor=1h]
  31. ec.encode [-collection=""] [-volumeId=<volume_id>]
  32. This command will:
  33. 1. freeze one volume
  34. 2. apply erasure coding to the volume
  35. 3. move the encoded shards to multiple volume servers
  36. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  37. to lose 4 volume servers.
  38. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  39. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  40. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  41. have 4 corrupted shard files.
  42. `
  43. }
  44. func (c *commandEcEncode) HasTag(CommandTag) bool {
  45. return false
  46. }
  47. func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  48. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  49. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  50. collection := encodeCommand.String("collection", "", "the collection name")
  51. fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  52. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  53. parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
  54. forceChanges := encodeCommand.Bool("force", false, "force the encoding even if the cluster has less than recommended 4 nodes")
  55. if err = encodeCommand.Parse(args); err != nil {
  56. return nil
  57. }
  58. if err = commandEnv.confirmIsLocked(args); err != nil {
  59. return
  60. }
  61. // collect topology information
  62. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  63. if err != nil {
  64. return err
  65. }
  66. if !*forceChanges {
  67. var nodeCount int
  68. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  69. nodeCount++
  70. })
  71. if nodeCount < erasure_coding.ParityShardsCount {
  72. glog.V(0).Infof("skip erasure coding with %d nodes, less than recommended %d nodes", nodeCount, erasure_coding.ParityShardsCount)
  73. return nil
  74. }
  75. }
  76. vid := needle.VolumeId(*volumeId)
  77. // volumeId is provided
  78. if vid != 0 {
  79. return doEcEncode(commandEnv, *collection, vid, *parallelCopy)
  80. }
  81. // apply to all volumes in the collection
  82. volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
  83. if err != nil {
  84. return err
  85. }
  86. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  87. for _, vid := range volumeIds {
  88. if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil {
  89. return err
  90. }
  91. }
  92. return nil
  93. }
  94. func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
  95. if !commandEnv.isLocked() {
  96. return fmt.Errorf("lock is lost")
  97. }
  98. // find volume location
  99. locations, found := commandEnv.MasterClient.GetLocationsClone(uint32(vid))
  100. if !found {
  101. return fmt.Errorf("volume %d not found", vid)
  102. }
  103. // fmt.Printf("found ec %d shards on %v\n", vid, locations)
  104. // mark the volume as readonly
  105. err = markVolumeReplicasWritable(commandEnv.option.GrpcDialOption, vid, locations, false, false)
  106. if err != nil {
  107. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
  108. }
  109. // generate ec shards
  110. err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].ServerAddress())
  111. if err != nil {
  112. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  113. }
  114. // balance the ec shards to current cluster
  115. err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy)
  116. if err != nil {
  117. return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
  118. }
  119. return nil
  120. }
  121. func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
  122. fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
  123. err := operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  124. _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
  125. VolumeId: uint32(volumeId),
  126. Collection: collection,
  127. })
  128. return genErr
  129. })
  130. return err
  131. }
  132. func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) {
  133. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
  134. if err != nil {
  135. return err
  136. }
  137. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  138. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  139. }
  140. allocatedDataNodes := allEcNodes
  141. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  142. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  143. }
  144. // calculate how many shards to allocate for these servers
  145. allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
  146. // ask the data nodes to copy from the source volume server
  147. copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0], parallelCopy)
  148. if err != nil {
  149. return err
  150. }
  151. // unmount the to be deleted shards
  152. err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
  153. if err != nil {
  154. return err
  155. }
  156. // ask the source volume server to clean up copied ec shards
  157. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].ServerAddress(), copiedShardIds)
  158. if err != nil {
  159. return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
  160. }
  161. // ask the source volume server to delete the original volume
  162. for _, location := range existingLocations {
  163. fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
  164. err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.ServerAddress(), false)
  165. if err != nil {
  166. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  167. }
  168. }
  169. return err
  170. }
  171. func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location, parallelCopy bool) (actuallyCopied []uint32, err error) {
  172. fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url)
  173. var wg sync.WaitGroup
  174. shardIdChan := make(chan []uint32, len(targetServers))
  175. copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
  176. defer wg.Done()
  177. copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
  178. allocatedEcShardIds, volumeId, collection, existingLocation.ServerAddress())
  179. if copyErr != nil {
  180. err = copyErr
  181. } else {
  182. shardIdChan <- copiedShardIds
  183. server.addEcVolumeShards(volumeId, collection, copiedShardIds)
  184. }
  185. }
  186. cleanupFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
  187. if err := unmountEcShards(grpcDialOption, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
  188. fmt.Printf("unmount aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
  189. }
  190. if err := sourceServerDeleteEcShards(grpcDialOption, collection, volumeId, pb.NewServerAddressFromDataNode(server.info), allocatedEcShardIds); err != nil {
  191. fmt.Printf("remove aborted shards %d.%v on %s: %v\n", volumeId, allocatedEcShardIds, server.info.Id, err)
  192. }
  193. }
  194. // maybe parallelize
  195. for i, server := range targetServers {
  196. if len(allocatedEcIds[i]) <= 0 {
  197. continue
  198. }
  199. wg.Add(1)
  200. if parallelCopy {
  201. go copyFunc(server, allocatedEcIds[i])
  202. } else {
  203. copyFunc(server, allocatedEcIds[i])
  204. }
  205. }
  206. wg.Wait()
  207. close(shardIdChan)
  208. if err != nil {
  209. for i, server := range targetServers {
  210. if len(allocatedEcIds[i]) <= 0 {
  211. continue
  212. }
  213. cleanupFunc(server, allocatedEcIds[i])
  214. }
  215. return nil, err
  216. }
  217. for shardIds := range shardIdChan {
  218. actuallyCopied = append(actuallyCopied, shardIds...)
  219. }
  220. return
  221. }
  222. func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
  223. allocated = make([][]uint32, len(servers))
  224. allocatedShardIdIndex := uint32(0)
  225. serverIndex := rand.Intn(len(servers))
  226. for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
  227. if servers[serverIndex].freeEcSlot > 0 {
  228. allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
  229. allocatedShardIdIndex++
  230. }
  231. serverIndex++
  232. if serverIndex >= len(servers) {
  233. serverIndex = 0
  234. }
  235. }
  236. return allocated
  237. }
  238. func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  239. // collect topology information
  240. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv, 0)
  241. if err != nil {
  242. return
  243. }
  244. quietSeconds := int64(quietPeriod / time.Second)
  245. nowUnixSeconds := time.Now().Unix()
  246. fmt.Printf("collect volumes quiet for: %d seconds and %.1f%% full\n", quietSeconds, fullPercentage)
  247. vidMap := make(map[uint32]bool)
  248. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  249. for _, diskInfo := range dn.DiskInfos {
  250. for _, v := range diskInfo.VolumeInfos {
  251. // ignore remote volumes
  252. if v.RemoteStorageName != "" && v.RemoteStorageKey != "" {
  253. continue
  254. }
  255. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
  256. if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
  257. vidMap[v.Id] = true
  258. }
  259. }
  260. }
  261. }
  262. })
  263. for vid := range vidMap {
  264. vids = append(vids, needle.VolumeId(vid))
  265. }
  266. return
  267. }